Skip to content

Commit

Permalink
Upgrade Spark and PySpark version to 3.3.0 (#146)
Browse files Browse the repository at this point in the history
Co-authored-by: Navin Soni <[email protected]>
  • Loading branch information
navinsoni and navinns authored Aug 23, 2022
1 parent 0dfda76 commit fd0ec0b
Show file tree
Hide file tree
Showing 18 changed files with 128 additions and 99 deletions.
2 changes: 2 additions & 0 deletions buildspec-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ phases:

build:
commands:
- export SBT_OPTS="-Xms1024M -Xmx4G -Xss2M -XX:MaxMetaspaceSize=2G"

# ignore reuse error to allow retry of this build stage
# when sonatype step has transient error
- publish-pypi-package --ignore-reuse-error $CODEBUILD_SRC_DIR_ARTIFACT_1/sagemaker-pyspark-sdk/dist/sagemaker_pyspark-*.tar.gz
Expand Down
10 changes: 4 additions & 6 deletions buildspec-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,21 @@ phases:

build:
commands:
- export SBT_OPTS="-Xms1024M -Xmx4G -Xss2M -XX:MaxMetaspaceSize=2G"

# prepare the release (update versions, changelog etc.)
- git-release --prepare

# spark unit tests and package (no coverage)
- cd $CODEBUILD_SRC_DIR/sagemaker-spark-sdk
- AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= AWS_SESSION_TOKEN=
AWS_CONTAINER_CREDENTIALS_RELATIVE_URI=
sbt -Dsbt.log.noformat=true clean test package
- sbt -Dsbt.log.noformat=true clean test package

# pyspark linters, package and doc build tests
- cd $CODEBUILD_SRC_DIR/sagemaker-pyspark-sdk
- tox -e flake8,twine,sphinx

# pyspark unit tests (no coverage)
- AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= AWS_SESSION_TOKEN=
AWS_CONTAINER_CREDENTIALS_RELATIVE_URI= IGNORE_COVERAGE=-
tox -e py27,py36 -- tests/
- tox -e py37 -- tests/

# todo consider adding subset of integration tests

Expand Down
22 changes: 11 additions & 11 deletions buildspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ phases:
- export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/bin

# install sbt launcher
- curl -LO https://github.com/sbt/sbt/releases/download/v1.1.6/sbt-1.1.6.tgz
- curl -LO https://github.com/sbt/sbt/releases/download/v1.7.1/sbt-1.7.1.tgz
- tar -xf sbt-*.tgz
- export PATH=$CODEBUILD_SRC_DIR/sbt/bin/:$PATH
- cd $CODEBUILD_SRC_DIR/sagemaker-spark-sdk
Expand All @@ -26,13 +26,13 @@ phases:

build:
commands:
- export SBT_OPTS="-Xms1024M -Xmx4G -Xss2M -XX:MaxMetaspaceSize=2G"

# build spark sdk first, since pyspark package depends on it (even linters)

# spark unit tests
- cd $CODEBUILD_SRC_DIR/sagemaker-spark-sdk
- AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= AWS_SESSION_TOKEN=
AWS_CONTAINER_CREDENTIALS_RELATIVE_URI=
sbt -Dsbt.log.noformat=true clean coverage test coverageReport
- sbt -Dsbt.log.noformat=true clean coverage test coverageReport

# rebuild without coverage instrumentation
- cd $CODEBUILD_SRC_DIR/sagemaker-spark-sdk
Expand All @@ -41,16 +41,16 @@ phases:
# pyspark linters and unit tests
- cd $CODEBUILD_SRC_DIR/sagemaker-pyspark-sdk
- tox -e flake8,twine,sphinx
- AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= AWS_SESSION_TOKEN=
AWS_CONTAINER_CREDENTIALS_RELATIVE_URI=
tox -e py36,stats -- tests/
- tox -e py37,stats -- tests/

# spark integration tests
- cd $CODEBUILD_SRC_DIR/integration-tests/sagemaker-spark-sdk
- test_cmd="sbt -Dsbt.log.noformat=true it:test"
- execute-command-if-has-matching-changes "$test_cmd" "src/" "test/" "build.sbt" "buildspec.yml"
- sbt -Dsbt.log.noformat=true it:test
# - test_cmd="sbt -Dsbt.log.noformat=true it:test"
# - execute-command-if-has-matching-changes "$test_cmd" "src/" "test/" "build.sbt" "buildspec.yml"

# pyspark integration tests
- cd $CODEBUILD_SRC_DIR/sagemaker-pyspark-sdk
- test_cmd="IGNORE_COVERAGE=- tox -e py36 -- $CODEBUILD_SRC_DIR/integration-tests/sagemaker-pyspark-sdk/tests/ -n 10 --boxed --reruns 2"
- execute-command-if-has-matching-changes "$test_cmd" "src/" "tests/" "setup.*" "requirements.txt" "tox.ini" "buildspec.yml"
- IGNORE_COVERAGE=- tox -e py37 -- $CODEBUILD_SRC_DIR/integration-tests/sagemaker-pyspark-sdk/tests/ -n 10 --boxed --reruns 2
# - test_cmd="IGNORE_COVERAGE=- tox -e py37 -- $CODEBUILD_SRC_DIR/integration-tests/sagemaker-pyspark-sdk/tests/ -n 10 --boxed --reruns 2"
# - execute-command-if-has-matching-changes "$test_cmd" "src/" "tests/" "setup.*" "requirements.txt" "tox.ini" "buildspec.yml"
48 changes: 25 additions & 23 deletions sagemaker-pyspark-sdk/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,19 @@ def read_version():
print("Could not create dir {0}".format(TEMP_PATH), file=sys.stderr)
exit(1)

p = subprocess.Popen("sbt printClasspath".split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd="../sagemaker-spark-sdk/")
p = subprocess.Popen(
"sbt printClasspath".split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd="../sagemaker-spark-sdk/",
)

output, errors = p.communicate()

classpath = []
# Java Libraries to include.
java_libraries = ['aws', 'sagemaker', 'hadoop', 'htrace']
for line in output.decode('utf-8').splitlines():
java_libraries = ["aws", "sagemaker", "hadoop", "htrace"]
for line in output.decode("utf-8").splitlines():
path = str(line.strip())
if path.endswith(".jar") and os.path.exists(path):
jar = os.path.basename(path).lower()
Expand All @@ -65,8 +67,10 @@ def read_version():

else:
if not os.path.exists(JARS_TARGET):
print("You need to be in the sagemaker-pyspark-sdk root folder to package",
file=sys.stderr)
print(
"You need to be in the sagemaker-pyspark-sdk root folder to package",
file=sys.stderr,
)
exit(-1)

setup(
Expand All @@ -76,32 +80,30 @@ def read_version():
author="Amazon Web Services",
url="https://github.com/aws/sagemaker-spark",
license="Apache License 2.0",
python_requires=">= 3.7",
zip_safe=False,

packages=["sagemaker_pyspark",
"sagemaker_pyspark.algorithms",
"sagemaker_pyspark.transformation",
"sagemaker_pyspark.transformation.deserializers",
"sagemaker_pyspark.transformation.serializers",
"sagemaker_pyspark.jars",
"sagemaker_pyspark.licenses"],

packages=[
"sagemaker_pyspark",
"sagemaker_pyspark.algorithms",
"sagemaker_pyspark.transformation",
"sagemaker_pyspark.transformation.deserializers",
"sagemaker_pyspark.transformation.serializers",
"sagemaker_pyspark.jars",
"sagemaker_pyspark.licenses",
],
package_dir={
"sagemaker_pyspark": "src/sagemaker_pyspark",
"sagemaker_pyspark.jars": "deps/jars",
"sagemaker_pyspark.licenses": "licenses"
"sagemaker_pyspark.licenses": "licenses",
},
include_package_data=True,

package_data={
"sagemaker_pyspark.jars": ["*.jar"],
"sagemaker_pyspark.licenses": ["*.txt"]
"sagemaker_pyspark.licenses": ["*.txt"],
},

scripts=["bin/sagemakerpyspark-jars", "bin/sagemakerpyspark-emr-jars"],

install_requires=[
"pyspark==2.4.0",
"pyspark==3.3.0",
"numpy",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ def __init__(self,
if uid is None:
uid = Identifiable._randomUID()

kwargs = locals()
kwargs = locals().copy()
del kwargs['self']
super(XGBoostSageMakerEstimator, self).__init__(**kwargs)

Expand Down
20 changes: 10 additions & 10 deletions sagemaker-pyspark-sdk/tests/namepolicy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,29 @@ def with_spark_context():
def test_CustomNamePolicyFactory():
policy_factory = CustomNamePolicyFactory("jobName", "modelname", "epconfig", "ep")
java_obj = policy_factory._to_java()
assert(isinstance(java_obj, JavaObject))
assert(java_obj.getClass().getSimpleName() == "CustomNamePolicyFactory")
assert (isinstance(java_obj, JavaObject))
assert (java_obj.getClass().getSimpleName() == "CustomNamePolicyFactory")
policy_name = java_obj.createNamePolicy().getClass().getSimpleName()
assert(policy_name == "CustomNamePolicy")
assert (policy_name == "CustomNamePolicy")


def test_CustomNamePolicyWithTimeStampSuffixFactory():
policy_factory = CustomNamePolicyWithTimeStampSuffixFactory("jobName", "modelname",
"epconfig", "ep")
java_obj = policy_factory._to_java()
assert(isinstance(java_obj, JavaObject))
assert (isinstance(java_obj, JavaObject))
assert (java_obj.getClass().getSimpleName() == "CustomNamePolicyWithTimeStampSuffixFactory")
policy_name = java_obj.createNamePolicy().getClass().getSimpleName()
assert(policy_name == "CustomNamePolicyWithTimeStampSuffix")
assert (policy_name == "CustomNamePolicyWithTimeStampSuffix")


def test_CustomNamePolicyWithTimeStampSuffix():
name_policy = CustomNamePolicyWithTimeStampSuffix("jobName", "modelname", "epconfig", "ep")
assert(isinstance(name_policy._to_java(), JavaObject))
assert(name_policy._call_java("trainingJobName") != "jobName")
assert(name_policy._call_java("modelName") != "modelname")
assert(name_policy._call_java("endpointConfigName") != "epconfig")
assert(name_policy._call_java("endpointName") != "ep")
assert (isinstance(name_policy._to_java(), JavaObject))
assert (name_policy._call_java("trainingJobName") != "jobName")
assert (name_policy._call_java("modelName") != "modelname")
assert (name_policy._call_java("endpointConfigName") != "epconfig")
assert (name_policy._call_java("endpointName") != "ep")

assert (name_policy._call_java("trainingJobName").startswith("jobName"))
assert (name_policy._call_java("modelName").startswith("modelname"))
Expand Down
6 changes: 3 additions & 3 deletions sagemaker-pyspark-sdk/tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = flake8,twine,sphinx,py36,stats
envlist = flake8,twine,sphinx,py37,stats
skip_missing_interpreters = False

[testenv]
Expand Down Expand Up @@ -38,8 +38,8 @@ basepython = python3
deps =
twine>=1.12.0
commands =
python setup.py sdist
twine check dist/*.tar.gz
- python setup.py sdist
- twine check dist/*.tar.gz

[testenv:flake8]
basepython=python3
Expand Down
25 changes: 15 additions & 10 deletions sagemaker-spark-sdk/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ scmInfo := Some(
)
licenses := Seq("Apache License, Version 2.0" -> url("https://aws.amazon.com/apache2.0"))

scalaVersion := "2.11.7"
scalaVersion := "2.12.16"

// to change the version of spark add -DSPARK_VERSION=2.x.x when running sbt
// for example: "sbt -DSPARK_VERSION=2.1.1 clean compile test doc package"
val sparkVersion = System.getProperty("SPARK_VERSION", "2.4.0")
val sparkVersion = System.getProperty("SPARK_VERSION", "3.3.0")

lazy val SageMakerSpark = (project in file("."))

Expand All @@ -29,16 +29,18 @@ version := {
}

libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-aws" % "2.8.1",
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.835",
"com.amazonaws" % "aws-java-sdk-sts" % "1.11.835",
"com.amazonaws" % "aws-java-sdk-sagemaker" % "1.11.835",
"com.amazonaws" % "aws-java-sdk-sagemakerruntime" % "1.11.835",
"org.apache.hadoop" % "hadoop-aws" % "3.3.1",
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.262",
"com.amazonaws" % "aws-java-sdk-sts" % "1.12.262",
"com.amazonaws" % "aws-java-sdk-sagemaker" % "1.12.262",
"com.amazonaws" % "aws-java-sdk-sagemakerruntime" % "1.12.262",
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.scalatest" %% "scalatest" % "3.0.4" % "test",
"org.mockito" % "mockito-all" % "1.10.19" % "test"
"org.scoverage" %% "scalac-scoverage-plugin" % "1.4.2" % "provided",
"org.scalatest" %% "scalatest" % "3.0.9" % "test",
"org.scala-sbt" %% "compiler-bridge" % "1.7.1" % "test",
"org.mockito" % "mockito-all" % "2.0.2-beta" % "test"
)

// add a task to print the classpath. Also use the packaged JAR instead
Expand All @@ -48,8 +50,11 @@ lazy val printClasspath = taskKey[Unit]("Dump classpath")
printClasspath := (fullClasspath in Runtime value) foreach { e => println(e.data) }

// set coverage threshold
coverageMinimum := 90
coverageFailOnMinimum := true
coverageMinimumStmtTotal := 90
coverageMinimumBranchTotal := 90
coverageMinimumStmtPerPackage := 83
coverageMinimumBranchPerPackage := 75

// make scalastyle gate the build
(compile in Compile) := {
Expand Down
2 changes: 1 addition & 1 deletion sagemaker-spark-sdk/project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.2.1
sbt.version=1.7.1
2 changes: 1 addition & 1 deletion sagemaker-spark-sdk/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.8.1")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.0")
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package com.amazonaws.services.sagemaker.sparksdk
import java.time.Duration
import java.util.UUID

import scala.collection.JavaConversions._
import scala.collection.immutable.Map
import scala.jdk.CollectionConverters._

import com.amazonaws.SdkBaseException
import com.amazonaws.retry.RetryUtils
Expand Down Expand Up @@ -225,14 +225,15 @@ class SageMakerEstimator(val trainingImage: String,
* @return a SageMaker hyper-parameter map
*/
private[sparksdk] def makeHyperParameters() : java.util.Map[String, String] = {
val trainingJobHyperParameters : java.util.Map[String, String] =
new java.util.HashMap(hyperParameters)
val trainingJobHyperParameters : scala.collection.mutable.Map[String, String] =
scala.collection.mutable.Map() ++ hyperParameters

params.filter(p => hasDefault(p) || isSet(p)) map {
case p => (p.name, this.getOrDefault(p).toString)
} foreach {
case (key, value) => trainingJobHyperParameters.put(key, value)
}
trainingJobHyperParameters
trainingJobHyperParameters.asJava
}

private[sparksdk] def resolveS3Path(s3Resource : S3Resource,
Expand Down Expand Up @@ -462,8 +463,8 @@ class SageMakerEstimator(val trainingImage: String,

try {
val objectList = s3Client.listObjects(s3Bucket, s3Prefix)
for (s3Object <- objectList.getObjectSummaries) {
s3Client.deleteObject(s3Bucket, s3Object.getKey)
objectList.getObjectSummaries.forEach{
s3Object => s3Client.deleteObject(s3Bucket, s3Object.getKey)
}
s3Client.deleteObject(s3Bucket, s3Prefix)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ class SageMakerProtobufWriter(path : String, context : TaskAttemptContext, dataS
write(converter(row))
}

override def path(): String = {
return path;
}

/**
* Writes a row to an underlying RecordWriter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.mockito.Matchers.any
import org.mockito.Mockito._
import org.scalatest._
import org.scalatest.mockito.MockitoSugar
import scala.language.postfixOps

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.param.{BooleanParam, IntParam, Param}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.mockito.Mockito.when
import org.scalatest.BeforeAndAfter
import org.scalatest.FlatSpec
import org.scalatest.mockito.MockitoSugar
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql._
Expand Down
Loading

0 comments on commit fd0ec0b

Please sign in to comment.