Skip to content

Demonstrates how to submit a job to Spark on HDP directly via YARN's REST API from any workstation

Notifications You must be signed in to change notification settings

bernhard-42/spark-yarn-rest-api

Repository files navigation

1 Preparation on the HDP cluster

Note: The description and code of this repository is for HDP 2.3.2.x and 2.4.0.0

For Hortonworks Data Platform 2.4 the Spark assembly file is not on HDFS. It is helpful to (or ask the HDP admin to) copy the assembly to its default location hdfs://hdp//hdp/apps/2.4.0.0-169/spark//spark/

  • HDP 2.3.2:
    • Version: 2.3.2.0-2950
    • Spark Jar: /usr/hdp/2.3.2.0-2950/spark/lib/spark-assembly-1.4.1.2.3.2.0-2950-hadoop2.7.1.2.3.2.0-2950.jar
  • HDP 2.4.0:
    • Version: 2.4.0.0-169
    • Spark Jar: /usr/hdp/2.4.0.0-169/spark/lib/spark-assembly-1.6.0.2.4.0.0-169-hadoop2.7.1.2.4.0.0-169.jar
sudo su - hdfs
HDP_VERSION=2.4.0.0-169
SPARK_JAR=spark-assembly-1.6.0.2.4.0.0-169-hadoop2.7.1.2.4.0.0-169.jar

hdfs dfs -mkdir "/hdp/apps/${HDP_VERSION}/spark/"
hdfs dfs -put "/usr/hdp/${HDP_VERSION}/spark/lib/$SPARK_JAR" "/hdp/apps/${HDP_VERSION}/spark/spark-hdp-assembly.jar"

2 Load data and copy it into HDFS

Only a small data set, however sufficient for a sample

wget https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data

Create a project folder in HDFS using WebHDFS

curl -X PUT "${WEBHDFS_HOST}:50070/webhdfs/v1/tmp/simple-project?op=MKDIRS"
# {"boolean":true}

Upload data to project folder using WebHDFS

curl -i -X PUT "${WEBHDFS_HOST}/webhdfs/v1/tmp/simple-project/iris.data?op=CREATE&overwrite=true"
# HTTP/1.1 307 TEMPORARY_REDIRECT
# Cache-Control: no-cache
# Expires: Sun, 10 Apr 2016 11:35:44 GMT
# Date: Sun, 10 Apr 2016 11:35:44 GMT
# Pragma: no-cache
# Expires: Sun, 10 Apr 2016 11:35:44 GMT
# Date: Sun, 10 Apr 2016 11:35:44 GMT
# Pragma: no-cache
# Location: http://<<NameNode>>:50075/webhdfs/v1/tmp/simple-project/simple-project_2.10-1.0.jar?op=CREATE&namenoderpcaddress=<<NameNode>>:8020&createflag=&# createparent=true&overwrite=true
# Content-Type: application/octet-stream
# Content-Length: 0
# Server: Jetty(6.1.26.hwx)

LOCATION="http://<<NameNode>>:50075/webhdfs/v1/tmp/simple-project/simple-project_2.10-1.0.jar?op=CREATE&namenoderpcaddress=<<NameNode>>:8020&createflag=&createparent=true&overwrite=true"

curl -i -X PUT -T "iris.data" "${LOCATION}"

3 Submit a project to Spark from your workstation via python script

Note: Install requests module: pip install requests

3.1 Create Spark project and copy it to HDFS

Simple project to calculate mean of each feature per species

cd simple-project

Copy the right iris.sbt-HDP* to iris.sbt

sbt package

export APP_FILE=simple-project_2.10-1.0.jar

curl -i -X PUT "${WEBHDFS_HOST}/webhdfs/v1/tmp/simple-project/${APP_FILE}?op=CREATE&overwrite=true"
# take Location header, see above

LOCATION="http://..."

curl -i -X PUT -T "target/scala-2.10/${APP_FILE}" "${LOCATION}"

cd ..

3.2 Submit job

Notes:

  • If you use Knox with Basic Authentication to access the cluster, set the KNOX_CREDENTIALS environment variable

     export KNOX_CREDENTIALS=<<username>>:<<password>>
  • If the cluster behind Knox is or should be kerberized, look at ./Readme-Knox-Kerberos.md

Copy project.cfg.template to project.cfg, edit hostnames and call

python spark-remote-submit.py
# Checking project folder ...
# Uploading App Jar ...
# Uploading Spark properties
# Creating Spark Job file ...
# Submitting Spark Job ...
# 
# ==> Job tracking URL: http://${HADOOP_RM}:8088/ws/v1/cluster/apps/application_1460392460492_0025

3.3 Track job

curl -s http://${HADOOP_RM}:8088/ws/v1/cluster/apps/application_1460392460492_0025 | jq .
# {
#   "app": {
#     "id": "application_1460392460492_0025",
#     "user": "dr.who",
#     "name": "SimpleProject",
#     "queue": "default",
#     "state": "FINISHED",
#     "finalStatus": "SUCCEEDED",
#     "progress": 100,
#     "trackingUI": "History",
#     "trackingUrl": "http://<<NameNode>>:8088/proxy/application_1460392460492_0025/",
#     "diagnostics": "",
#     "clusterId": 1460392460492,
#     "applicationType": "YARN",
#     "applicationTags": "",
#     "startedTime": 1460413592029,
#     "finishedTime": 1460413612191,
#     "elapsedTime": 20162,
#     "amContainerLogs": "http://<<NodeManager>>:8042/node/containerlogs/container_e03_1460392460492_0025_01_000001/dr.who",
#     "amHostHttpAddress": "<<ApplicationMasterHost>>:8042",
#     "allocatedMB": -1,
#     "allocatedVCores": -1,
#     "runningContainers": -1,
#     "memorySeconds": 85603,
#     "vcoreSeconds": 51,
#     "preemptedResourceMB": 0,
#     "preemptedResourceVCores": 0,
#     "numNonAMContainerPreempted": 0,
#     "numAMContainerPreempted": 0,
#     "logAggregationStatus": "SUCCEEDED"
#   }
# }

4 Manually submit a project to Spark from your workstation

4.1 Create Spark project and copy it to HDFS

see 3.1

4.2 Populate the control files for the YARN REST API

4.2.1 Spark properties

Copy spark-yarn.properties.template to spark-yarn.properties and edit keys if necessary.

Upload spark-yarn.properties to the project folder in HDFS

curl -i -X PUT "${WEBHDFS_HOST}/webhdfs/v1/tmp/simple-project/spark-yarn.properties?op=CREATE&overwrite=true"
# take Location header, see above

LOCATION="http://..."
curl -i -X PUT -T "spark-yarn.properties" "$LOCATION"

4.2.2 The JSON job file for the YARN REST API

For caching purposes Spark needs file sizes and modification times of all project files. The following commands use the json processor jq from https://stedolan.github.io/jq/

curl -s "${WEBHDFS_HOST}/webhdfs/v1/hdp/apps/2.4.0.0-169/spark/spark-hdp-assembly.jar?op=GETFILESTATUS" \
| jq '.FileStatus | {size: .length, timestamp: .modificationTime}'
# {
#   "size": 191724610,
#   "timestamp": 1460219553714
# }
curl -s "${WEBHDFS_HOST}/webhdfs/v1/tmp/simple-project/simple-project_2.10-1.0.jar?op=GETFILESTATUS" \
| jq '.FileStatus | {size: .length, timestamp: .modificationTime}'
# {
#   "size": 10270,
#   "timestamp": 1460288240001
# }
curl -s "${WEBHDFS_HOST}/webhdfs/v1/tmp/simple-project/spark-yarn.properties?op=GETFILESTATUS" \
| jq '.FileStatus | {size: .length, timestamp: .modificationTime}'
# {
#   "size": 767,
#   "timestamp": 1460289956356
# }

Copy spark-yarn.json.template to spark-yarn.json and edit all local-resources:

  • resource: adapt namenode address
  • timestamp, size: according to the above values

Next edit the enviroments section and modify the keys SPARK_YARN_CACHE_FILES, SPARK_YARN_CACHE_FILES_FILE_SIZES, SPARK_YARN_CACHE_FILES_TIME_STAMPS so that file names, timestamps and sizes are the same as in the local_resources section.

Note: The properties file is only for the Application Master and can be ignored here.

Also adapt versions in CLASSPATH of section environment and in the command

4.3 Submit a Spark job to YARN

4.3.1 Create a YARN application

curl -s -X POST ${HADOOP_RM}/ws/v1/cluster/apps/new-application | jq .
# {
#   "application-id": "application_1460195242962_0051",
#   "maximum-resource-capability": {
#     "memory": 4000,
#     "vCores": 3
#   }
# }

Edit spark-yarn.json again and modify the application-id to hold the newly create id.

4.3.2 Submit the Spark job

curl -s -i -X POST -H "Content-Type: application/json" ${HADOOP_RM}/ws/v1/cluster/apps --data-binary spark-yarn.json 
# HTTP/1.1 100 Continue
# 
# HTTP/1.1 202 Accepted
# Cache-Control: no-cache
# Expires: Sun, 10 Apr 2016 13:02:47 GMT
# Date: Sun, 10 Apr 2016 13:02:47 GMT
# Pragma: no-cache
# Expires: Sun, 10 Apr 2016 13:02:47 GMT
# Date: Sun, 10 Apr 2016 13:02:47 GMT
# Pragma: no-cache
# Content-Type: application/json
# Location: http://<<Resourcemanager>>:8088/ws/v1/cluster/apps/application_1460195242962_0054
# Content-Length: 0
# Server: Jetty(6.1.26.hwx)

4.3.3 Get job status and result

Take the Location header from above:

curl -s "http://<<Resourcemanager>>:8088/ws/v1/cluster/apps/application_1460195242962_0054" | jq .
# {
#   "app": {
#     "id": "application_1460195242962_0054",
#     "user": "dr.who",
#     "name": "IrisApp",
#     "queue": "default",
#     "state": "FINISHED",
#     "finalStatus": "SUCCEEDED",
#     "progress": 100,
#     "trackingUI": "History",
#     "trackingUrl": "http://<<ResourceManager>>:8088/proxy/application_1460195242962_0054/",
#     "diagnostics": "",
#     "clusterId": 1460195242962,
#     "applicationType": "YARN",
#     "applicationTags": "",
#     "startedTime": 1460293367576,
#     "finishedTime": 1460293413568,
#     "elapsedTime": 45992,
#     "amContainerLogs": "http://<<NodeManager>>:8042/node/containerlogs/container_e29_1460195242962_0054_01_000001/dr.who",
#     "amHostHttpAddress": "<<ApplicationMasterHost>>:8042",
#     "allocatedMB": -1,
#     "allocatedVCores": -1,
#     "runningContainers": -1,
#     "memorySeconds": 172346,
#     "vcoreSeconds": 112,
#     "queueUsagePercentage": 0,
#     "clusterUsagePercentage": 0,
#     "preemptedResourceMB": 0,
#     "preemptedResourceVCores": 0,
#     "numNonAMContainerPreempted": 0,
#     "numAMContainerPreempted": 0,
#     "logAggregationStatus": "SUCCEEDED"
#   }
# }

#5 Get the result

Note: The partition name depends on run, find it via WebHDFS and LISTSTATUS

curl -s -L ${WEBHDFS_HOST}/webhdfs/v1/tmp/iris/means/part-r-00000-a1d003bf-246b-47b5-9d61-10dede1c3981?op=OPEN | jq .
# {
#   "species": "Iris-setosa",
#   "avg(sepalLength)": 5.005999999999999,
#   "avg(sepalWidth)": 3.4180000000000006,
#   "avg(petalLength)": 1.464,
#   "avg(petalWidth)": 0.2439999999999999
# }
# {
#   "species": "Iris-versicolor",
#   "avg(sepalLength)": 5.936,
#   "avg(sepalWidth)": 2.77,
#   "avg(petalLength)": 4.26,
#   "avg(petalWidth)": 1.3260000000000003
# }
# {
#   "species": "Iris-virginica",
#   "avg(sepalLength)": 6.587999999999998,
#   "avg(sepalWidth)": 2.9739999999999998,
#   "avg(petalLength)": 5.552,
#   "avg(petalWidth)": 2.026
# }

About

Demonstrates how to submit a job to Spark on HDP directly via YARN's REST API from any workstation

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published