Skip to content

Commit 5ff04dc

Browse files
authored
Unify python examples and update run scripts (#111)
* Unify the PySpark examples (idk why they were seperated) and add a script to run them same as the SQL ex. * Try and make run more flex * Change how we trigger OOMing. * Skip doctest of OOM since it puts SparkContext into a bad state. * Add a quote and disable SC2046
1 parent 79acfc1 commit 5ff04dc

File tree

8 files changed

+102
-79
lines changed

8 files changed

+102
-79
lines changed

.github/workflows/ci.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,21 @@ jobs:
5858
- name: Run sql examples
5959
run:
6060
./run_sql_examples.sh
61+
run-pyspark-examples:
62+
runs-on: ubuntu-latest
63+
steps:
64+
- name: Checkout
65+
uses: actions/checkout@v2
66+
- name: Cache Spark and friends
67+
uses: actions/cache@v3
68+
with:
69+
path: |
70+
spark*.tgz
71+
iceberg*.jar
72+
key: spark-artifacts
73+
- name: Run PySpark examples
74+
run:
75+
./run_pyspark_examples.sh
6176
style:
6277
runs-on: ubuntu-latest
6378
steps:

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,5 @@ warehouse/
7777
metastore_db/
7878

7979
# Misc internal stuff
80-
sql/*.sql.out
80+
sql/*.sql.out
81+
python/examples/*.py.out

env_setup.sh

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#!/bin/bash
2+
3+
4+
# Download Spark and iceberg if not present
5+
SPARK_MAJOR="3.4"
6+
SPARK_VERSION=3.4.1
7+
HADOOP_VERSION="3"
8+
SPARK_PATH="spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}"
9+
SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz"
10+
ICEBERG_VERSION="1.3.1"
11+
if [ ! -f "${SPARK_FILE}" ]; then
12+
wget "https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/${SPARK_FILE}" &
13+
fi
14+
# Download Icberg if not present
15+
ICEBERG_FILE="iceberg-spark-runtime-${SPARK_MAJOR}_2.13-${ICEBERG_VERSION}.jar"
16+
if [ ! -f "${ICEBERG_FILE}" ]; then
17+
wget "https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-${SPARK_MAJOR}_2.13/${ICEBERG_VERSION}/${ICEBERG_FILE}" -O "${ICEBERG_FILE}" &
18+
fi
19+
wait
20+
# Setup the env
21+
if [ ! -d "${SPARK_PATH}" ]; then
22+
tar -xf ${SPARK_FILE}
23+
fi
24+
if [ ! -f "${SPARK_PATH}/jars/${ICEBERG_FILE}" ]; then
25+
cp "${ICEBERG_FILE}" "${SPARK_PATH}/jars/${ICEBERG_FILE}"
26+
fi
27+
28+
# Set up for running pyspark and friends
29+
export PATH=${SPARK_PATH}:${SPARK_PATH}/python:${SPARK_PATH}/bin:${SPARK_PATH}/sbin:${PATH}
30+
31+
# Make sure we have a history directory
32+
mkdir -p /tmp/spark-events
33+

high_performance_pyspark/SQLLineage.py renamed to python/examples/SQLLineage.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
from pyspark.sql import DataFrame, Row
2+
from pyspark.sql.session import SparkSession
3+
import sys
4+
5+
global df
6+
global sc
7+
global rdd
8+
global spark
9+
10+
111
"""
212
>>> df = rdd.toDF()
313
>>> df2 = cutLineage(df)
@@ -7,14 +17,6 @@
717
True
818
"""
919

10-
global df
11-
global sc
12-
global rdd
13-
global spark
14-
15-
from pyspark.context import SparkContext
16-
from pyspark.sql import DataFrame, Row
17-
from pyspark.sql.session import SparkSession
1820

1921
# tag::cutLineage[]
2022
def cutLineage(df):
@@ -31,11 +33,8 @@ def cutLineage(df):
3133
jSchema = df._jdf.schema()
3234
jRDD.cache()
3335
sqlCtx = df.sql_ctx
34-
try:
35-
javaSqlCtx = sqlCtx._jsqlContext
36-
except:
37-
javaSqlCtx = sqlCtx._ssql_ctx
38-
newJavaDF = javaSqlCtx.createDataFrame(jRDD, jSchema)
36+
javaSparkSession = sqlCtx._jSparkSession
37+
newJavaDF = javaSparkSession.createDataFrame(jRDD, jSchema)
3938
newDF = DataFrame(newJavaDF, sqlCtx)
4039
return newDF
4140

@@ -50,7 +49,7 @@ def _setupTest():
5049
sc.setLogLevel("ERROR")
5150
globs["sc"] = sc
5251
globs["spark"] = spark
53-
globs["rdd"] = rdd = sc.parallelize(
52+
globs["rdd"] = sc.parallelize(
5453
[
5554
Row(field1=1, field2="row1"),
5655
Row(field1=2, field2="row2"),
@@ -75,8 +74,6 @@ def _test():
7574
exit(-1)
7675

7776

78-
import sys
79-
8077
if __name__ == "__main__":
8178
_test()
8279
# Hack to support running in nose

high_performance_pyspark/bad_pyspark.py renamed to python/examples/bad_pyspark.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# This script triggers a number of different PySpark errors
22

3-
from pyspark import *
43
from pyspark.sql.session import SparkSession
4+
import sys
55

66
global sc
77

@@ -131,22 +131,20 @@ def loggedDivZero(x):
131131

132132
def runOutOfMemory(sc):
133133
"""
134-
Run out of memory on the workers.
135-
In standalone modes results in a memory error, but in YARN may trigger YARN container
136-
overhead errors.
137-
>>> runOutOfMemory(sc)
134+
Run out of memory on the workers from a skewed shuffle.
135+
>>> runOutOfMemory(sc) # doctest: +SKIP
138136
Traceback (most recent call last):
139137
...
140138
Py4JJavaError:...
141139
"""
142140
# tag::worker_oom[]
143-
data = sc.parallelize(range(10))
141+
data = sc.parallelize(range(10000))
144142

145-
def generate_too_much(itr):
146-
return range(10000000000000)
143+
def generate_too_much(i: int):
144+
return list(map(lambda v: (i % 2, v), range(100000 * i)))
147145

148-
itr = data.flatMap(generate_too_much)
149-
itr.count()
146+
bad = data.flatMap(generate_too_much).groupByKey()
147+
bad.count()
150148
# end::worker_oom[]
151149

152150

@@ -166,17 +164,18 @@ def _test():
166164
"""
167165
import doctest
168166

169-
globs = setupTest()
167+
globs = _setupTest()
170168
(failure_count, test_count) = doctest.testmod(
171169
globs=globs, optionflags=doctest.ELLIPSIS
172170
)
171+
print("All tests done, stopping Spark context.")
173172
globs["sc"].stop()
174173
if failure_count:
175174
exit(-1)
175+
else:
176+
exit(0)
176177

177178

178-
import sys
179-
180179
if __name__ == "__main__":
181180
_test()
182181
# Hack to support running in nose

high_performance_pyspark/simple_perf.py renamed to python/examples/simple_perf.py

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
# should be taken as it depends on many private members that may change in
55
# future releases of Spark.
66

7-
from pyspark.sql.types import *
8-
from pyspark.sql import *
7+
from pyspark.sql.types import StructType, IntegerType, DoubleType, StructField
8+
from pyspark.sql import DataFrame, SparkSession
9+
import sys
910
import timeit
1011
import time
1112

@@ -29,14 +30,7 @@ def generate_scale_data(sqlCtx, rows, numCols):
2930
"""
3031
# tag::javaInterop[]
3132
sc = sqlCtx._sc
32-
# Get the SQL Context, 2.1, 2.0 and pre-2.0 syntax - yay internals :p
33-
try:
34-
try:
35-
javaSqlCtx = sqlCtx._jsqlContext
36-
except:
37-
javaSqlCtx = sqlCtx._ssql_ctx
38-
except:
39-
javaSqlCtx = sqlCtx._jwrapped
33+
javaSparkSession = sqlCtx._jSparkSession
4034
jsc = sc._jsc
4135
scalasc = jsc.sc()
4236
gateway = sc._gateway
@@ -54,13 +48,9 @@ def generate_scale_data(sqlCtx, rows, numCols):
5448
schema = StructType(
5549
[StructField("zip", IntegerType()), StructField("fuzzyness", DoubleType())]
5650
)
57-
# 2.1 / pre-2.1
58-
try:
59-
jschema = javaSqlCtx.parseDataType(schema.json())
60-
except:
61-
jschema = sqlCtx._jsparkSession.parseDataType(schema.json())
51+
jschema = javaSparkSession.parseDataType(schema.json())
6252
# Convert the Java RDD to Java DataFrame
63-
java_dataframe = javaSqlCtx.createDataFrame(java_rdd, jschema)
53+
java_dataframe = javaSparkSession.createDataFrame(java_rdd, jschema)
6454
# Wrap the Java DataFrame into a Python DataFrame
6555
python_dataframe = DataFrame(java_dataframe, sqlCtx)
6656
# Convert the Python DataFrame into an RDD
@@ -143,13 +133,9 @@ def parseArgs(args):
143133

144134

145135
if __name__ == "__main__":
146-
147136
"""
148137
Usage: simple_perf_test scalingFactor size
149138
"""
150-
import sys
151-
from pyspark import SparkContext
152-
from pyspark.sql import SQLContext
153139

154140
(scalingFactor, size) = parseArgs(sys.argv)
155141
session = SparkSession.appName("SimplePythonPerf").builder.getOrCreate()

run_pyspark_examples.sh

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/bin/bash
2+
3+
source env_setup.sh
4+
5+
pip install -r ./python/requirements.txt
6+
7+
for ex in python/examples/*.py; do
8+
# shellcheck disable=SC2046
9+
spark-submit \
10+
--master local[5] \
11+
--conf spark.eventLog.enabled=true \
12+
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
13+
--conf spark.sql.catalog.spark_catalog.type=hive \
14+
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
15+
--conf spark.sql.catalog.local.type=hadoop \
16+
--conf "spark.sql.catalog.local.warehouse=$PWD/warehouse" \
17+
$(cat "${ex}.conf" || echo "") \
18+
--name "${ex}" \
19+
"${ex}" 2>&1 | tee -a "${ex}.out"
20+
done

run_sql_examples.sh

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,7 @@
11
#!/bin/bash
22
set -ex
33

4-
# Download Spark and iceberg if not present
5-
SPARK_MAJOR="3.4"
6-
SPARK_VERSION=3.4.1
7-
HADOOP_VERSION="3"
8-
SPARK_PATH="spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}"
9-
SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz"
10-
ICEBERG_VERSION="1.3.1"
11-
if [ ! -f "${SPARK_FILE}" ]; then
12-
wget "https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/${SPARK_FILE}" &
13-
fi
14-
# Download Icberg if not present
15-
ICEBERG_FILE="iceberg-spark-runtime-${SPARK_MAJOR}_2.13-${ICEBERG_VERSION}.jar"
16-
if [ ! -f "${ICEBERG_FILE}" ]; then
17-
wget "https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-${SPARK_MAJOR}_2.13/${ICEBERG_VERSION}/${ICEBERG_FILE}" -O "${ICEBERG_FILE}" &
18-
fi
19-
wait
20-
# Setup the env
21-
if [ ! -d "${SPARK_PATH}" ]; then
22-
tar -xf ${SPARK_FILE}
23-
fi
24-
if [ ! -f "${SPARK_PATH}/jars/${ICEBERG_FILE}" ]; then
25-
cp "${ICEBERG_FILE}" "${SPARK_PATH}/jars/${ICEBERG_FILE}"
26-
fi
27-
28-
# Set up for running pyspark and friends
29-
export PATH=${SPARK_PATH}:${SPARK_PATH}/python:${SPARK_PATH}/bin:${SPARK_PATH}/sbin:${PATH}
30-
31-
# Make sure we have a history directory
32-
mkdir -p /tmp/spark-events
4+
source env_setup.sh
335

346
# We use `` for mid multi-line command comments. (see https://stackoverflow.com/questions/9522631/how-to-put-a-line-comment-for-a-multi-line-command).
357
# For each SQL

0 commit comments

Comments
 (0)