Skip to content

Latest commit

 

History

History
 
 

sagemaker-pyspark-sdk

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

SageMaker

Amazon SageMaker PySpark

The SageMaker PySpark SDK provides a pyspark interface to Amazon SageMaker, allowing customers to train using the Spark Estimator API, host their model on Amazon SageMaker, and make predictions with their model using the Spark Transformer API. You can find the latest, most up to date, documentation at Read the Docs.

Table of Contents

  1. Quick Start
  2. Running on SageMaker Notebook Instances
  3. Development

Quick Start

sagemaker_pyspark works with python 2.7 and python 3.x. To install it use pip:

$ pip install sagemaker_pyspark

You can also install sagemaker_pyspark from source:

$ git clone [email protected]:aws/sagemaker-spark.git
$ cd sagemaker-pyspark-sdk
$ python setup.py install

Next, set up credentials (in e.g. ~/.aws/credentials):

[default]
aws_access_key_id = YOUR_KEY
aws_secret_access_key = YOUR_KEY

Then, set up a default region (in e.g. ~/.aws/config):

[default]
region=us-west-2

Then, to load the sagemaker jars programatically:

import sagemaker_pyspark
from pyspark.sql import SparkSession

classpath = ":".join(sagemaker_pyspark.classpath_jars())
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath).getOrCreate()

Alternatively pass the jars to your pyspark job via the --jars flag:

$ spark-submit --jars `bin/sagemakerpyspark-jars`

If you want to play around in interactive mode, the pyspark shell can be used too:

$ pyspark --jars `bin/sagemakerpyspark-jars`

You can also use the --packages flag and pass in the Maven coordinates for SageMaker Spark:

$ pyspark --packages com.amazonaws:sagemaker-spark_2.11:spark_2.1.1-1.0

S3 File System Schemes

In PySpark, we recommend using "s3://" to access the EMR file system(EMRFS) in EMR and "s3a://" to access S3A file system in other environments. Examples:

data_s3 = spark.read.format("libsvm").load("s3://some-bucket/some-prefix")
data_s3a = spark.read.format("libsvm").load("s3a://some-bucket/some-prefix")

Training and Hosting a K-Means Clustering model using SageMaker PySpark

A KMeansSageMakerEstimator runs a training job using the Amazon SageMaker KMeans algorithm upon invocation of fit(), returning a SageMakerModel.

from pyspark import SparkContext, SparkConf
from sagemaker_pyspark import IAMRole, classpath_jars
from sagemaker_pyspark.algorithms import KMeansSageMakerEstimator

# Load the sagemaker_pyspark classpath. If you used --jars to submit your job
# there is no need to do this in code.
conf = (SparkConf()
        .set("spark.driver.extraClassPath", ":".join(classpath_jars())))
SparkContext(conf=conf)

iam_role = "arn:aws:iam:0123456789012:role/MySageMakerRole"

region = "us-east-1"
training_data = spark.read.format("libsvm").option("numFeatures", "784")
  .load("s3a://sagemaker-sample-data-{}/spark/mnist/train/".format(region))

test_data = spark.read.format("libsvm").option("numFeatures", "784")
  .load("s3a://sagemaker-sample-data-{}/spark/mnist/train/".format(region))

kmeans_estimator = KMeansSageMakerEstimator(
    trainingInstanceType="ml.m4.xlarge",
    trainingInstanceCount=1,
    endpointInstanceType="ml.m4.xlarge",
    endpointInitialInstanceCount=1,
    sagemakerRole=IAMRole(iam_role))

kmeans_estimator.setK(10)
kmeans_estimator.setFeatureDim(784)

kmeans_model = kmeans_estimator.fit(training_data)

transformed_data = kmeans_model.transform(test_data)
transformed_data.show()

The SageMakerEstimator expects an input DataFrame with a column named "features" that holds a Spark ML Vector. The estimator also serializes a "label" column of Doubles if present. Other columns are ignored. The dimension of this input vector should be equal to the feature dimension given as a hyperparameter.

The Amazon SageMaker KMeans algorithm accepts many parameters, but K (the number of clusters) and FeatureDim (the number of features per Row) are required.

You can set other hyperparameters, for details on them, run:

kmeans_estimator.explainParams()

After training is complete, an Amazon SageMaker Endpoint is created to host the model and serve predictions. Upon invocation of transform(), the SageMakerModel predicts against their hosted model. Like the SageMakerEstimator, the SageMakerModel expects an input DataFrame with a column named "features" that holds a Spark ML Vector equal in dimension to the value of the FeatureDim parameter.

You can view the PySpark API Documentation for SageMaker Spark here

Training and Hosting an XGBoost model using SageMaker PySpark

A XGBoostSageMakerEstimator runs a training job using the Amazon SageMaker XGBoost algorithm upon invocation of fit(), returning a SageMakerModel.

from pyspark import SparkContext, SparkConf
from sagemaker_pyspark import IAMRole, classpath_jars
from sagemaker_pyspark.algorithms import XGBoostSageMakerEstimator

# Load the sagemaker_pyspark classpath. If you used --jars to submit your job
# there is no need to do this in code.
conf = (SparkConf()
        .set("spark.driver.extraClassPath", ":".join(classpath_jars())))
SparkContext.getOrCreate(conf=conf)

iam_role = "arn:aws:iam:0123456789012:role/MySageMakerRole"

region = "us-east-1"
training_data = (spark.read.format("libsvm").option("numFeatures", "784")
  .load("s3a://sagemaker-sample-data-{}/spark/mnist/train/".format(region)))

test_data = (spark.read.format("libsvm").option("numFeatures", "784")
  .load("s3a://sagemaker-sample-data-{}/spark/mnist/train/".format(region)))

xgboost_estimator = XGBoostSageMakerEstimator(
    trainingInstanceType="ml.m4.xlarge",
    trainingInstanceCount=1,
    endpointInstanceType="ml.m4.xlarge",
    endpointInitialInstanceCount=1,
    sagemakerRole=IAMRole(iam_role))

xgboost_estimator.setObjective('multi:softmax')
xgboost_estimator.setNumRound(25)
xgboost_estimator.setNumClasses(10)

xgboost_model = xgboost_estimator.fit(training_data)

transformed_data = xgboost_model.transform(test_data)
transformed_data.show()

The SageMakerEstimator expects an input DataFrame with a column named "features" that holds a Spark ML Vector. The estimator also serializes a "label" column of Doubles if present. Other columns are ignored.

The Amazon SageMaker XGBoost algorithm accepts many parameters. Objective (the learning objective of your model, in this case multi-class classification) and NumRounds (the number of rounds to perform tree boosting on) are required. For multi-class classification NumClasses (the number of classes to classify the data into) is required as well.

You can set other hyperparameters, for details on them, run:

xgboost_estimator.explainParams()

After training is complete, an Amazon SageMaker Endpoint is created to host the model and serve predictions. Upon invocation of transform(), the SageMakerModel predicts against the hosted model.

You can view the PySpark API Documentation for SageMaker Spark here

Running on SageMaker Notebook Instances

sagemaker_pyspark comes pre-installed in the SageMaker Notebook Environment. There are 2 use cases that we support:

  • running on local spark
  • connecting to an EMR spark cluster

Local Spark on SageMaker Notebook Instances

Create a notebook using the conda_python2 or conda_python3 Kernels. Then you can initialize a spark context the same way it is described in the QuickStart section:

import sagemaker_pyspark
from pyspark.sql import SparkSession

classpath = ":".join(sagemaker_pyspark.classpath_jars())
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath).getOrCreate()

Connecting to an EMR Spark Cluster

Note: Make sure your SageMaker Notebook instance can talk to your EMR Cluster. This means:

  • They are in the same VPC.
  • The EMR Cluster Security group allows TCP port 8998 on the SageMaker Notebook Security group to ingress.
Installing sagemaker_pyspark in a Spark EMR Cluster

sagemaker_pyspark works with EMR-5-8.0 (which runs Spark 2.2). To install sagemaker_pyspark in EMR:

Create a bootstrap script to install sagemaker_pyspark in your new EMR cluster:

#!/bin/bash

sudo pip install sagemaker_pyspark
sudo pip3 install sagemaker_pyspark

Upload this script to an S3 bucket:

$ aws s3 cp bootstrap.sh s3://your-bucket/prefix/

In the AWS Console launch a new EMR Spark Cluster, set s3://your-bucket/prefix/bootstrap.sh as the bootstrap script. Make sure to:

  • Run the Cluster in the same VPC as your SageMaker Notebook Instance.
  • Provide an SSH Key that you have access to, as there will be some manual configuration required.

Once the cluster is launched, login to the master node:

$ ssh -i /path/to/ssh-key.pem hadoop@your-emr-cluster-public-dns

Create a backup of the default spark configuration:

$ cd /usr/lib/spark/conf
$ sudo cp spark-defaults.conf spark-defaults.conf.bk

Grab the EMR classpath from the installed sagemaker_pyspark:

$ sagemakerpyspark-emr-jars :

the output will be a ":" separated list of jar files. Copy the output and append it to the spark.driver.extraClassPath and spark.executor.extraClassPath sections of spark-defaults.conf

Make sure that there is a ":" after the original classpath before you paste the sagemaker_pyspark classpath.

Before proceeding to configure your Notebook instance, open port 8998 to allow ingress from the security group in the Notebook instance.

Configure your SageMaker Notebook instance to connect to the cluster

Open a terminal session in your notebook: new->terminal

Copy the default sparkmagic config

You can download it in your terminal using:

$ wget https://raw.githubusercontent.com/jupyter-incubator/sparkmagic/master/sparkmagic/example_config.json

In the kernel_python_credentials section, replace the url with http://your-cluster-private-dns-name:8998.

Override the default spark magic config

$ cp example_config.json ~/.sparkmagic/config.json

Launch a notebook using Sparkmagic (Pyspark) Kernel. As soon as you try to run any code block, the notebook will connect to your spark cluster and get a SparkSession for you.

Development

Getting Started

Since sagemaker_pyspark depends on the Scala spark modules, you need to be able to build those. Follow the instructions in here.

For the python side, assuming that you have python and virtualenv installed, set up your environment and install the required dependencies like this instead of the pip install sagemaker_pyspark defined above:

$ git clone https://github.com/aws/sagemaker-spark.git
$ cd sagemaker-spark/sagemaker-pyspark-sdk/
$ virtualenv venv
....
$ . venv/bin/activate
$ pip install -r requirements.txt
$ pip install -e .

Running Tests

Our recommended way of running the tests is using pyenv + pyenv-virtualenv. This allows you to test on different python versions, and to test the installed distribution instead of your local files.

Install pyenv, pyenv-virtualenv and pyenv-virtualenvwrapper

You can do this in OSX using brew

$ brew install pyenv pyenv-virtualenv pyenv-virtualenvwrapper

For linux you can just follow the steps in each of the package's Readme. Or if your distribution has them as packages that is a good alternative.

make sure to add the pyenv and virtualenv init functions to your corresponding shell init (.bashrc, .zshrc, etc):

eval "$(pyenv init -)"
eval "$(pyenv virtualenv-init -)"

Start a new shell once you do that to pick up your changes.

Setup the python version we need. At the moment we are testing with python 2.7, 3.5 and 3.6 so we need to install these versions:

$ pyenv install 2.7.10
$ pyenv install 3.5.2
$ pyenv install 3.6.2

Set them as global versions

$ pyenv global 2.7.10 3.5.2 3.6.2

Verify they show up when you do:

$ pyenv versions

Restart your shell and run the command again to verify that it persists across shell sessions.

Now we just need to install tox to run our tests:

$ pip install tox

Run the tests by running:

$ tox