In this lab we will put the concepts that are central to Supercomputing with Big Data in some practical context. We will analyze a large open data set and identify a way of processing it efficiently using Apache Spark and the Amazon Web Services (AWS). The data set in question is the GDELT 2.0 Global Knowledge Graph (GKG), which indexes persons, organizations, companies, locations, themes, and even emotions from live news reports in print, broadcast and internet sources all over the world. We will use this data to construct a histogram of the topics that are most popular on a given day, hopefully giving us some interesting insights into the most important themes in recent history.
Feedback is appreciated! The lab files will be hosted on GitHub. You can also find the most up-to-date version of this manual over there. Feel free to make issues and/or pull requests to suggest or implement improvements.
All assignments are completed in groups of two, so you and your prospective team mate should enroll in a group via Brightspace.
Furthermore, the complete data set we will be looking at in lab 2 weighs in at several terabytes, so we need some kind of compute and storage infrastructure to run the pipeline. In this lab we will use Amazon AWS to facilitate this. As a student you are eligible for credits on this platform. We would like you to register for the GitHub Student Developer Pack, as soon as you decide to take this course. This gives you access to around 100 dollars worth of credits. This should be ample to complete lab 2. Don’t forget to follow to register on AWS using the referral link from GitHub. Use the “AWS Account ID” option (requiring a credit card), if you can.
Make sure you register for these credits as soon as possible! You can always send an email to the TAs if you run into any trouble.
Finally, be sure to keep an eye on the Brightspace page for course updates and the tentative schedule and deadlines.
The goal of this lab is to:
- familiarize yourself with Apache Spark, the MapReduce programming model, and Scala as a programming language;
- learn how to characterize your big data problem analytically and practically and what machines best fit this profile;
- get hands-on experience with cloud-based systems;
- learn about the existing infrastructure for big data and the difficulties with these; and
- learn how an existing application should be modified to function in a streaming data context.
You will work in groups of two. In this lab manual we will introduce a big data pipeline for identifying important events from the GDELT Global Knowledge Graph (GKG).
In lab 1, you will start by writing a Spark application that processes the GDELT dataset. You will run this application on a small subset of data on your local computer. You will use this to
- get familiar with the Spark APIs,
- analyze the application’s scaling behavior, and
- draw some conclusions on how to run it efficiently in the cloud.
It is up to you how you want to define efficiently, which can be in terms of performance, cost, or a combination of the two.
You may have noticed that the first lab does not contain any supercomputing, let alone big data. For lab 2, you will deploy your code on AWS, in an actual big data cluster, in an effort to scale up your application to process the complete dataset, which measures several terabytes. It is up to you to find the configuration that will get you the most efficiency, as per your definition in lab 1.
For the final lab, we will modify the code from lab 1 to work in a streaming data context. You will attempt to rewrite the application to process events in real-time, in a way that is still scalable over many machines.
In this first chapter, we will cover some of the concepts and technologies that are used during the course. We will introduce the following topics (in a different order):
- The GDELT project, a large database of “human society”, constructed of mentions of “people, organizations, locations, themes, counts, images and emotions” around the planet. As mentioned before, will use the GDELT database to construct a histogram of the most important themes during a certain timespan.
- Apache Spark, a framework for processing large amounts of data on multiple machines in a robust way. We will build our application for labs 1 and 2 using Spark.
- Amazon Web Services, or AWS, which provide theoretically unlimited compute infrastructure, allowing us to process a dataset as large as the entire GDELT database in lab 2.
- Apache Kafka, a framework for building so-called data pipelines, in which potentially many producers and consumers process real-time, streaming data. In lab 3, we will take the application from labs 1 and 2 and modify it to process data in real-time, using Kafka.
- Scala, a programming language that runs on the Java Virtual Machine (JVM). This is our (mandatory!) language of choice during the lab assignments. We will use it to program for both Apache Spark and Apache Kafka.
- Docker, an application that allows the user to package and run software (like Spark and Kafka and the programs we write for them) in an isolated environment: a container.
During the lab, we will use the GDELT Global Knowledge Graph version 2.0 (GKG v2.0). This database is basically a massive table that “connects every person, organization, location, count, theme, news source, and event across the planet into a single massive network that captures what’s happening around the world, what its context is and who’s involved, and how the world is feeling about it, every single day.”
The GKG is updated every 15 minutes and published as a series of compressed comma-separated values (CSV) files (in which the columns are actually separated using tabs). The first segment of the version 2 database was published back in 2015 and at the time of writing (September 2019) it contains 157378 segments with a total compressed file size of 1.4 terabytes (TiB). Uncompressed, this comes down to about 4.2 terabytes of raw data.
Read this article for a general introduction to the dataset, this table for a quick overview of its columns and what they mean, and this document for a more in depth description.
During the labs, we will use the AllNames
column, which lists all “proper
names” (people, organizations, countries, etcetera) mentioned. It can be seen
as a summary of many of the other columns.
According to the Docker Documentation
Docker is a platform for developers and sysadmins to develop, deploy, and run applications with containers. The use of Linux containers to deploy applications is called containerization. Containers are not new, but their use for easily deploying applications is. Containerization is increasingly popular because containers are:
- Flexible: Even the most complex applications can be containerized.
- Lightweight: Containers leverage and share the host kernel.
- Interchangeable: You can deploy updates and upgrades on-the-fly.
- Portable: You can build locally, deploy to the cloud, and run anywhere.
- Scalable: You can increase and automatically distribute container replicas.
- Stackable: You can stack services vertically and on-the-fly.
For this course, we use Docker primarily to ensure every student is using the exact same platform for their applications, and to avoid certain platform-specific issues and peculiarities.
A basic understanding of some Docker concepts helps in getting started with this course. Part 1: Orientation and setup of the Get Started Guide covers the basic Docker concepts used in this course.
Before trying the lab assignments and tutorials in the next sections, make sure you Install Docker (stable) and test your installation by running the simple Hello World image.
docker run hello-world
In order to run Spark in a container, a Dockerfile
is provided which can be
used to build images for spark-submit
to run your Spark application,
spark-shell
to run a Spark interactive shell, and the Spark history server to
view event logs from application runs. You need to build these images before
you get started. The Dockerfiles we provide assume that you run Docker from the
folder at which they are located. Don’t move them around! They will stop
working.
To build a docker image from the Dockerfile, we use docker build
:
docker build --target <target> -t <tag> .
Here <target>
selects the target from the Dockerfile, <tag>
sets the tag
for the resulting image, and the .
sets the build context to the current
working directory.
We use docker build
to build the images we need to use Spark and SBT.
-
sbt
[1]docker build \ --build-arg BASE_IMAGE_TAG="8" \ --build-arg SBT_VERSION="1.2.8" \ --build-arg SCALA_VERSION="2.11.12" \ -t hseeberger/scala-sbt \ github.com/hseeberger/scala-sbt.git#:debian
-
spark-shell
docker build --target spark-shell -t spark-shell .
-
spark-submit
docker build --target spark-submit -t spark-submit .
-
spark-history-server
docker build --target spark-history-server -t spark-history-server .
You can then run the following commands from the Spark application root
(the folder containing the build.sbt
file). Please make sure to use the
provided template project.
-
Run SBT to package or test your application (
sbt <command>
)docker run -it --rm -v "`pwd`":/root hseeberger/scala-sbt sbt
-
Start a Spark shell (
spark-shell
)docker run -it --rm -v "`pwd`":/io spark-shell
-
Run your Spark application (
spark-submit
) (fill in the class name of your application and the name of your project!)docker run -it --rm -v "`pwd`":/io -v "`pwd`"/spark-events:/spark-events \ spark-submit --class <YOUR_CLASSNAME> \ target/scala-2.11/<YOUR_PROJECT_NAME>_2.11-1.0.jar
-
Spawn the history server to view event logs, accessible at localhost:18080
docker run -it --rm -v "`pwd`"/spark-events:/spark-events \ -p 18080:18080 spark-history-server
The rest of the manual will not generally mention these Docker commands again,
so know that if we mention e.g. spark-shell
, you should run the corresponding
docker run
command listed above. You can create scripts or aliases for your
favorite shell to avoid having to type a lot.
Apache Spark, our big data framework of choice for this lab, is implemented in Scala, a compiled language on the JVM that supports a mix between functional and object-oriented programming. It is compatible with Java libraries. Some reasons why Spark was written in Scala are:
-
Compiling to the JVM makes the codebase extremely portable and deploying applications as easy as sending the Java bytecode (typically packaged in a Java ARchive format, or JAR). This simplifies deploying to cloud provider big data platforms as we don’t need specific knowledge of the operating system, or even the underlying architecture.
-
Compared to Java, Scala has some advantages in supporting more complex types, type inference, and anonymous functions[2]. Matei Zaharia, Apache Spark’s original author, has said the following about why Spark was implemented in Scala in a Reddit AMA:
At the time we started, I really wanted a PL that supports a language-integrated interface (where people write functions inline, etc), because I thought that was the way people would want to program these applications after seeing research systems that had it (specifically Microsoft’s DryadLINQ). However, I also wanted to be on the JVM in order to easily interact with the Hadoop filesystem and data formats for that. Scala was the only somewhat popular JVM language then that offered this kind of functional syntax and was also statically typed (letting us have some control over performance), so we chose that. Today there might be an argument to make the first version of the API in Java with Java 8, but we also benefitted from other aspects of Scala in Spark, like type inference, pattern matching, actor libraries, etc.
Apache Spark provides interfaces to Scala, R, Java and Python, but we will be using Scala to program in this lab. An introduction to Scala can be found on the Scala language site. You can have a brief look at it, but you can also pick up topics as you go through the lab.
Apache Spark provides a programming model for a resilient distributed shared memory model. To elaborate on this, Spark allows you to program against a unified view of memory (i.e. RDD or DataFrame), while the processing happens distributed over multiple nodes/machines/computers/servers being able to compensate for failures of these nodes.
This allows us to define a computation and scale this over multiple machines without having to think about communication, distribution of data, and potential failures of nodes. This advantage comes at a cost: All applications have to comply with Spark’s (restricted) programming model.
The programming model Spark exposes is based around the MapReduce paradigm. This is an important consideration when you would consider using Spark, does my problem fit into this paradigm?
Modern Spark exposes two APIs around this programming model:
- Resilient Distributed Datasets
- Spark SQL Dataframe/Datasets
In the rest of this section, we will demonstrate a simple application with implementations using both APIs.
RDDs are the original data abstraction used in Spark. Conceptually one can think of these as a large, unordered list of Java/Scala/Python objects, let’s call these objects elements. This list of elements is divided in partitions (which may still contain multiple elements), which can reside on different machines. One can operate on these elements with a number of operations, which can be subdivided in wide and narrow dependencies, see tbl. 1. An illustration of the RDD abstraction can be seen in fig. 1.
RDDs are immutable, which means that the elements cannot be altered, without creating a new RDD. Furthermore, the application of transformations (wide or narrow) is lazy evaluation, meaning that the actual computation will be delayed until results are requested (an action in Spark terminology). When applying transformations, these will form a directed acyclic graph (DAG), that instructs workers what operations to perform, on which elements to find a specific result. This can be seen in fig. 1 as the arrows between elements.
Narrow Dependency | Wide Dependency |
---|---|
map |
coGroup |
mapValues |
flatMap |
flatMap |
groupByKey |
filter |
reduceByKey |
mapPartitions |
combineByKey |
mapPartitionsWithIndex |
distinct |
join with sorted keys |
join |
intersection |
|
repartition |
|
coalesce |
|
sort |
Table 1: List of wide and narrow dependencies for (pair) RDD operations
Now that you have an idea of what the abstraction is about, let’s demonstrate
some example code with the Spark shell. If you want to paste pieces of code
into the spark shell from this guide, it might be useful to copy from the
github version, and use the :paste
command in the spark shell to paste the
code. Hit ctrl+D
to stop pasting.
$ docker run -it --rm -v "`pwd`":/io spark-shell
19/09/08 14:00:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://af29447c6dcd:4040
Spark context available as 'sc' (master = local[*], app id = local-1567951261349).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_222)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark
res2: org.apache.spark.sql.SparkSession =
org.apache.spark.sql.SparkSession@48a32c4f
When opening a Spark Shell, by default you get a SparkSession and SparkContext object. This object contains the configuration of your session, i.e. whether you are running in local or cluster mode, the name of your application, the logging level etc.
Going back to our shell, let’s first create some sample data that we can demonstrate the RDD API around. Here we create an infinite list of repeating characters from ‘a’ tot ‘z’.
scala> val charsOnce = ('a' to 'z').toStream
charsOnce: scala.collection.immutable.Stream[Char] = Stream(a, ?)
scala> val chars: Stream[Char] = charsOnce #::: chars
chars: Stream[Char] = Stream(a, ?)
Now we build a collection with the first 200000 integers, zipped with the character stream. We display the first 30 results.
scala> val rdd = sc.parallelize(chars.zip(1 to 200000), numSlices=20)
rdd: org.apache.spark.rdd.RDD[(Char, Int)] =
ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.take(30)
res2: Array[(Char, Int)] = Array((a,1), (b,2), (c,3), (d,4), (e,5), (f,6),
(g,7), (h,8), (i,9), (j,10), (k,11), (l,12), (m,13), (n,14), (o,15), (p,16),
(q,17), (r,18), (s,19), (t,20), (u,21), (v,22), (w,23), (x,24), (y,25), (z,26),
(a,27), (b,28), (c,29), (d,30))
Let’s dissect what just happened. We created a Scala object that is a list of
tuples of Char
s and Int
s in the statement (chars).zip(1 to 200000)
. With
sc.parallelize
we are transforming a Scala sequence into an RDD. This allows
us to enter Spark’s programming model. With the optional parameter numSlices
we indicate in how many partitions we want to subdivide the sequence.
Let’s apply some (lazily evaluated) transformations to this RDD.
scala> val mappedRDD = rdd.map({case (chr, num) => (chr, num+1)})
mappedRDD: org.apache.spark.rdd.RDD[(Char, Int)] =
MapPartitionsRDD[5] at map at <console>:25
We apply a map
to the RDD, applying a function to all the elements in the
RDD. The function we apply pattern matches over the elements as being a tuple
of (Char, Int)
, and add one to the integer. Scala’s syntax can be a bit
foreign, so if this is confusing, spend some time looking at tutorials and
messing around in the Scala interpreter.
You might have noticed that the transformation completed awfully fast. This is Spark’s lazy evaluation in action. No computation will be performed until an action is applied.
scala> val reducedRDD = rdd.reduceByKey(_ + _)
reducedRDD: org.apache.spark.rdd.RDD[(Char, Int)] =
ShuffledRDD[6] at reduceByKey at <console>:25
Now we apply a reduceByKey
operation, grouping all of the identical keys together and
merging the results with the specified function, in this case the +
operator.
Now we will perform an action, which will trigger the computation of the transformations on the data. We will use the collect action, which means to gather all the results to the master, going out of the Spark programming model, back to a Scala sequence. How many elements do you expect there to be in this sequence after the previous transformations?
scala> reducedRDD.collect
res3: Array[(Char, Int)] = Array((d,769300000), (x,769253844), (e,769307693),
(y,769261536), (z,769269228), (f,769315386), (g,769323079), (h,769330772),
(i,769138464), (j,769146156), (k,769153848), (l,769161540), (m,769169232),
(n,769176924), (o,769184616), (p,769192308), (q,769200000), (r,769207692),
(s,769215384), (t,769223076), (a,769276921), (u,769230768), (b,769284614),
(v,769238460), (w,769246152), (c,769292307))
Typically, we don’t build the data first, but we actually load it from a
database or file system. Say we have some data in (multiple) files in a
specific format. As an example consider sensordata.csv
(in the example
folder). We can load it as follows
// sc.textFile can take multiple files as argument!
scala> val raw_data = sc.textFile("sensordata.csv")
raw_data: org.apache.spark.rdd.RDD[String] =
sensordata.csv MapPartitionsRDD[1] at textFile at <console>:24
scala> raw_data.take(10).foreach(println)
COHUTTA,3/10/14:1:01,10.27,1.73,881,1.56,85,1.94
COHUTTA,3/10/14:1:02,9.67,1.731,882,0.52,87,1.79
COHUTTA,3/10/14:1:03,10.47,1.732,882,1.7,92,0.66
COHUTTA,3/10/14:1:05,9.56,1.734,883,1.35,99,0.68
COHUTTA,3/10/14:1:06,9.74,1.736,884,1.27,92,0.73
COHUTTA,3/10/14:1:08,10.44,1.737,885,1.34,93,1.54
COHUTTA,3/10/14:1:09,9.83,1.738,885,0.06,76,1.44
COHUTTA,3/10/14:1:11,10.49,1.739,886,1.51,81,1.83
COHUTTA,3/10/14:1:12,9.79,1.739,886,1.74,82,1.91
COHUTTA,3/10/14:1:13,10.02,1.739,886,1.24,86,1.79
We can process this data to filter only measurements on 3/10/14:1:01
.
scala> val filterRDD = raw_data.map(_.split(","))
.filter(x => x(1) == "3/10/14:1:01")
filterRDD: org.apache.spark.rdd.RDD[Array[String]] =
MapPartitionsRDD[11] at filter at <console>:25
scala> filterRDD.foreach(a => println(a.mkString(" ")))
COHUTTA 3/10/14:1:01 10.27 1.73 881 1.56 85 1.94
LAGNAPPE 3/10/14:1:01 9.59 1.602 777 0.09 88 1.78
NANTAHALLA 3/10/14:1:01 10.47 1.712 778 1.96 76 0.78
CHER 3/10/14:1:01 10.17 1.653 777 1.89 96 1.57
THERMALITO 3/10/14:1:01 10.24 1.75 777 1.25 80 0.89
ANDOUILLE 3/10/14:1:01 10.26 1.048 777 1.88 94 1.66
BUTTE 3/10/14:1:01 10.12 1.379 777 1.58 83 0.67
MOJO 3/10/14:1:01 10.47 1.828 967 0.36 77 1.75
CARGO 3/10/14:1:01 9.93 1.903 778 0.55 76 1.44
BBKING 3/10/14:1:01 10.03 0.839 967 1.17 80 1.28
You might have noticed that this is a bit tedious to work with, as we have to convert everything to Scala objects, and aggregations rely on having a pair RDD, which is fine when we have a single key, but for more complex aggregations, this becomes a bit tedious to juggle with.
Our previous example is quite a typical use case for Spark. We have a big data store of some structured (tabular) format (be it csv, JSON, parquet, or something else) that we would like to analyse, typically in some SQL-like fashion. Manually applying operations to rows like this is both labour intensive, and inefficient, as we have knowledge of the ‘schema’ of data. This is where DataFrames originate from. Spark has an optimized SQL query engine that can optimize the compute path as well as provide a more efficient representation of the rows when given a schema. From the Spark SQL, DataFrames and Datasets Guide:
Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.
Under the hood, these are still immutable distributed collections of data (with the same compute graph semantics, only now Spark can apply extra optimizations because of the (structured) format.
Let’s do the same analysis as last time using this API. First we will define a schema. Let’s take a look at a single row of the csv:
COHUTTA,3/10/14:1:01,10.27,1.73,881,1.56,85,1.94
So first a string field, a date, a timestamp, and some numeric information. We can thus define the schema as such:
val schema =
StructType(
Array(
StructField("sensorname", StringType, nullable=false),
StructField("timestamp", TimestampType, nullable=false),
StructField("numA", DoubleType, nullable=false),
StructField("numB", DoubleType, nullable=false),
StructField("numC", LongType, nullable=false),
StructField("numD", DoubleType, nullable=false),
StructField("numE", LongType, nullable=false),
StructField("numF", DoubleType, nullable=false)
)
)
If we import types first, and then enter this in our interactive shell we get the following:
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.types._
val schema =
StructType(
Array(
StructField("sensorname", StringType, nullable=false),
StructField("timestamp", TimestampType, nullable=false),
StructField("numA", DoubleType, nullable=false),
StructField("numB", DoubleType, nullable=false),
StructField("numC", LongType, nullable=false),
StructField("numD", DoubleType, nullable=false),
StructField("numE", LongType, nullable=false),
StructField("numF", DoubleType, nullable=false)
)
)
// Exiting paste mode, now interpreting.
import org.apache.spark.sql.types._
schema: org.apache.spark.sql.types.StructType =
StructType(StructField(sensorname,StringType,false),
StructField(timestamp,TimestampType,false), StructField(numA,DoubleType,false),
StructField(numB,DoubleType,false), StructField(numC,LongType,false),
StructField(numD,DoubleType,false), StructField(numE,LongType,false),
StructField(numF,DoubleType,false))
An overview of the different Spark SQL types can be found online. For the
timestamp field we need to specify the format according to the Java
date format—in our case MM/dd/yy:hh:mm
. Tying this
all together we can build a Dataframe like so.
scala> :paste
// Entering paste mode (ctrl-D to finish)
val df = spark.read
.schema(schema)
.option("timestampFormat", "MM/dd/yy:hh:mm")
.csv("./sensordata.csv")
// Exiting paste mode, now interpreting.
df: org.apache.spark.sql.DataFrame =
[sensorname: string, timestamp: date ... 6 more fields]
scala> df.printSchema
root
|-- sensorname: string (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- numA: double (nullable = true)
|-- numB: double (nullable = true)
|-- numC: long (nullable = true)
|-- numD: double (nullable = true)
|-- numE: long (nullable = true)
|-- numF: double (nullable = true
scala> df.take(10).foreach(println)
[COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94]
[COHUTTA,2014-03-10 01:02:00.0,9.67,1.731,882,0.52,87,1.79]
[COHUTTA,2014-03-10 01:03:00.0,10.47,1.732,882,1.7,92,0.66]
[COHUTTA,2014-03-10 01:05:00.0,9.56,1.734,883,1.35,99,0.68]
[COHUTTA,2014-03-10 01:06:00.0,9.74,1.736,884,1.27,92,0.73]
[COHUTTA,2014-03-10 01:08:00.0,10.44,1.737,885,1.34,93,1.54]
[COHUTTA,2014-03-10 01:09:00.0,9.83,1.738,885,0.06,76,1.44]
[COHUTTA,2014-03-10 01:11:00.0,10.49,1.739,886,1.51,81,1.83]
[COHUTTA,2014-03-10 01:12:00.0,9.79,1.739,886,1.74,82,1.91]
[COHUTTA,2014-03-10 01:13:00.0,10.02,1.739,886,1.24,86,1.79]
We can perform the same filtering operation as before in a couple of ways. We can use really error prone SQL queries (not recommended unless you absolutely love SQL and like debugging these command strings, this took me about 20 minutes to get right).
scala> df.createOrReplaceTempView("sensor")
scala> val dfFilter = spark.sql("SELECT * FROM sensor
WHERE timestamp=TIMESTAMP(\"2014-03-10 01:01:00\")")
// I think the newline in the multiline string breaks it if you paste it
dfFilter: org.apache.spark.sql.DataFrame =
[sensorname: string, timestamp: timestamp ... 6 more fields]
scala> dfFilter.collect.foreach(println)
[COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94]
[NANTAHALLA,2014-03-10 01:01:00.0,10.47,1.712,778,1.96,76,0.78]
[THERMALITO,2014-03-10 01:01:00.0,10.24,1.75,777,1.25,80,0.89]
[BUTTE,2014-03-10 01:01:00.0,10.12,1.379,777,1.58,83,0.67]
[CARGO,2014-03-10 01:01:00.0,9.93,1.903,778,0.55,76,1.44]
[LAGNAPPE,2014-03-10 01:01:00.0,9.59,1.602,777,0.09,88,1.78]
[CHER,2014-03-10 01:01:00.0,10.17,1.653,777,1.89,96,1.57]
[ANDOUILLE,2014-03-10 01:01:00.0,10.26,1.048,777,1.88,94,1.66]
[MOJO,2014-03-10 01:01:00.0,10.47,1.828,967,0.36,77,1.75]
[BBKING,2014-03-10 01:01:00.0,10.03,0.839,967,1.17,80,1.28]
A slightly more sane and type-safe way would be to do the following.
scala> val dfFilter = df.filter("timestamp = TIMESTAMP(\"2014-03-10 01:01:00\")")
dfFilter: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] =
[sensorname: string, timestamp: timestamp ... 6 more fields]
scala> dfFilter.collect.foreach(println)
[COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94]
[NANTAHALLA,2014-03-10 01:01:00.0,10.47,1.712,778,1.96,76,0.78]
[THERMALITO,2014-03-10 01:01:00.0,10.24,1.75,777,1.25,80,0.89]
[BUTTE,2014-03-10 01:01:00.0,10.12,1.379,777,1.58,83,0.67]
[CARGO,2014-03-10 01:01:00.0,9.93,1.903,778,0.55,76,1.44]
[LAGNAPPE,2014-03-10 01:01:00.0,9.59,1.602,777,0.09,88,1.78]
[CHER,2014-03-10 01:01:00.0,10.17,1.653,777,1.89,96,1.57]
[ANDOUILLE,2014-03-10 01:01:00.0,10.26,1.048,777,1.88,94,1.66]
[MOJO,2014-03-10 01:01:00.0,10.47,1.828,967,0.36,77,1.75]
[BBKING,2014-03-10 01:01:00.0,10.03,0.839,967,1.17,80,1.28]
But this is still quite error-prone as writing these strings contains no typechecking. This is not a big deal when writing these queries in an interactive environment on a small dataset, but can be quite time consuming when there’s a typo at the end of a long running job that means two hours of your (and the cluster’s) time is wasted.
This is why the Spark community developed the Dataset abstraction. It is a sort of middle ground between Dataframes and RDDs, where you get some of the type safety of RDDs by operating on a case class (also known as product type). This allows us to use the compile-time typechecking on the product types, whilst still allowing Spark to optimize the query and storage of the data by making use of schemas.
Let’s dive in some code, first we need to define a product type for a row.
scala> import java.sql.Timestamp
import java.sql.Timestamp
scala> :paste
// Entering paste mode (ctrl-D to finish)
case class SensorData (
sensorName: String,
timestamp: Timestamp,
numA: Double,
numB: Double,
numC: Long,
numD: Double,
numE: Long,
numF: Double
)
// Exiting paste mode, now interpreting.
defined class SensorData
Now we can convert a Dataframe (which actually is just an untyped Dataset) to a
typed Dataset using the as
method.
scala> :paste
// Entering paste mode (ctrl-D to finish)
val ds = spark.read .schema(schema)
.option("timestampFormat", "MM/dd/yy:hh:mm")
.csv("./sensordata.csv")
.as[SensorData]
// Exiting paste mode, now interpreting.
ds: org.apache.spark.sql.Dataset[SensorData] =
[sensorname: string, timestamp: timestamp ... 6 more fields]
Now we can apply compile time type-checked operations.
scala> val dsFilter = ds.filter(a => a.timestamp ==
new Timestamp(2014 - 1900, 2, 10, 1, 1, 0, 0))
dsFilter: org.apache.spark.sql.Dataset[SensorData] =
[sensorname: string, timestamp: timestamp ... 6 more fields]
scala> dsFilter.collect.foreach(println)
SensorData(COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94)
SensorData(NANTAHALLA,2014-03-10 01:01:00.0,10.47,1.712,778,1.96,76,0.78)
SensorData(THERMALITO,2014-03-10 01:01:00.0,10.24,1.75,777,1.25,80,0.89)
SensorData(BUTTE,2014-03-10 01:01:00.0,10.12,1.379,777,1.58,83,0.67)
SensorData(CARGO,2014-03-10 01:01:00.0,9.93,1.903,778,0.55,76,1.44)
SensorData(LAGNAPPE,2014-03-10 01:01:00.0,9.59,1.602,777,0.09,88,1.78)
SensorData(CHER,2014-03-10 01:01:00.0,10.17,1.653,777,1.89,96,1.57)
SensorData(ANDOUILLE,2014-03-10 01:01:00.0,10.26,1.048,777,1.88,94,1.66)
SensorData(MOJO,2014-03-10 01:01:00.0,10.47,1.828,967,0.36,77,1.75)
SensorData(BBKING,2014-03-10 01:01:00.0,10.03,0.839,967,1.17,80,1.28)
This provides us with more guarantees that are queries are valid (atleast on a type level).
This was a brief overview of the 2 (or 3) different Spark APIs. You can always find more information on the programming guides for RDDs and Dataframes/Datasets and in the Spark documentation
We showed how to run Spark in interactive mode. Now we will explain how to
build applications that can be submitted using the spark-submit
command.
First, we will explain how to structure a Scala project, using the SBT build tool. The typical project structure is
├── build.sbt
├── project
│ └── build.properties
└── src
└── main
└── scala
└── example.scala
This is typical for JVM languages. More directories are added under the scala
folder to resemble the package structure.
The project’s name, dependencies, and versioning is defined in the build.sbt
file. An example build.sbt
file is
name := "Example"
scalaVersion := "2.11.12"
This specifies the Scala version of the project (2.11.12) and the name of the project.
If you run sbt
in this folder it will generate the project directory and
build.properties
. build.properties
contains the SBT version that is
used to build the project with, for backwards compatibility.
Open example.scala
and add the following
package example
object Example {
def main(args: Array[String]) {
println("Hello world!")
}
}
Start a scala-sbt
container in the root folder (the one where build.sbt
is located). This puts
you in interactive mode of SBT. We can compile the sources by writing the
compile
command.
$ docker run -it --rm -v "`pwd`":/root hseeberger/scala-sbt sbt
Getting org.scala-sbt sbt 1.2.8 (this may take some time)...
...
[info] Loading settings for project root from build.sbt ...
[info] Set current project to Example (in build file:/root/)
[info] sbt server started at local:///root/.sbt/1.0/server/27dc1aa3fdf4049b492d/sock
sbt:Example> compile
...
[info] Done compiling.
[success] Total time: 0 s, completed Sep 8, 2019 2:17:14 PM
We can try to run the application by typing run
.
sbt:Example> run
[info] Running example.Example
Hello world!
[success] Total time: 1 s, completed Sep 8, 2019 2:18:18 PM
Now let’s add a function to example.scala
.
object Example {
def addOne(tuple: (Char, Int)) : (Char, Int) = tuple match {
case (chr, int) => (chr, int+1)
}
def main(args: Array[String]) {
println("Hello world!")
println(addOne('a', 1))
}
}
In your SBT session we can prepend any command with a tilde (~
) to make them
run automatically on source changes.
sbt:Example> ~run
[info] Compiling 1 Scala source to ...
[info] Done compiling.
[info] Packaging ...
[info] Done packaging.
[info] Running example.Example
Hello world!
(a,2)
[success] Total time: 1 s, completed Sep 8, 2019 2:19:03 PM
1. Waiting for source changes in project hello... (press enter to interrupt)
We can also open an interactive session using SBT.
sbt:Example> console
[info] Starting scala interpreter...
Welcome to Scala 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_222).
Type in expressions for evaluation. Or try :help.
scala> example.Example.addOne('a', 1)
res1: (Char, Int) = (a,2)
scala> println("Interactive environment")
Interactive environment
To build Spark applications with SBT we need to include dependencies (Spark
most notably) to build the project. Modify your build.sbt
file like so
name := "Example"
scalaVersion := "2.11.12"
val sparkVersion = "2.4.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion
)
We can now use Spark in the script. Modify example.scala
.
package example
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import java.sql.Timestamp
object ExampleSpark {
case class SensorData (
sensorName: String,
timestamp: Timestamp,
numA: Double,
numB: Double,
numC: Long,
numD: Double,
numE: Long,
numF: Double
)
def main(args: Array[String]) {
val schema =
StructType(
Array(
StructField("sensorname", StringType, nullable=false),
StructField("timestamp", TimestampType, nullable=false),
StructField("numA", DoubleType, nullable=false),
StructField("numB", DoubleType, nullable=false),
StructField("numC", LongType, nullable=false),
StructField("numD", DoubleType, nullable=false),
StructField("numE", LongType, nullable=false),
StructField("numF", DoubleType, nullable=false)
)
)
val spark = SparkSession
.builder
.appName("Example")
.getOrCreate()
val sc = spark.sparkContext // If you need SparkContext object
import spark.implicits._
val ds = spark.read
.schema(schema)
.option("timestampFormat", "MM/dd/yy:hh:mm")
.csv("./sensordata.csv")
.as[SensorData]
val dsFilter = ds.filter(a => a.timestamp ==
new Timestamp(2014 - 1900, 2, 10, 1, 1, 0, 0))
dsFilter.collect.foreach(println)
spark.stop
}
}
You can build a JAR using the package
command in SBT. This JAR will be
located in the target/scala-version/project_name_version.jar
.
You can run the JAR via a spark-submit
container (which will run on local
mode). By mounting the spark-events
directory the event log of the
application run is stored to be inspected later using the Spark history server.
$ docker run -it --rm -v "`pwd`":/io -v "`pwd`"/spark-events:/spark-events
spark-submit target/scala-2.11/example_2.11-0.1.0-SNAPSHOT.jar
INFO:...
SensorData(COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94)
SensorData(NANTAHALLA,2014-03-10 01:01:00.0,10.47,1.712,778,1.96,76,0.78)
SensorData(THERMALITO,2014-03-10 01:01:00.0,10.24,1.75,777,1.25,80,0.89)
SensorData(BUTTE,2014-03-10 01:01:00.0,10.12,1.379,777,1.58,83,0.67)
SensorData(CARGO,2014-03-10 01:01:00.0,9.93,1.903,778,0.55,76,1.44)
SensorData(LAGNAPPE,2014-03-10 01:01:00.0,9.59,1.602,777,0.09,88,1.78)
SensorData(CHER,2014-03-10 01:01:00.0,10.17,1.653,777,1.89,96,1.57)
SensorData(ANDOUILLE,2014-03-10 01:01:00.0,10.26,1.048,777,1.88,94,1.66)
SensorData(MOJO,2014-03-10 01:01:00.0,10.47,1.828,967,0.36,77,1.75)
SensorData(BBKING,2014-03-10 01:01:00.0,10.03,0.839,967,1.17,80,1.28)
INFO:...
By default, Spark’s logging is quite assertive. You can change the log levels to warn to reduce the output.
For development purposes you can also try running the application from SBT
using the run
command. You might run into some trouble with threads here,
which can be solved by running the application in a forked process, which can be
enabled by setting fork in run := true
in build.sbt
. You will also have to
set to change the log levels programmatically, if desired.
import org.apache.log4j.{Level, Logger}
...
def main(args: Array[String]) {
...
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
...
}
You can also use this logger to log your application which might be helpful for debugging on the AWS cluster later on.
You can inspect the event log from the application run using the Spark history
server. Start a spark-history-server
container from the project root folder
and mount the spark-events
folder in the container.
$ docker run -it --rm -v "`pwd`"/spark-events/:/spark-events -p 18080:18080
spark-history-server
starting org.apache.spark.deploy.history.HistoryServer, logging to
/spark/logs/spark--org.apache.spark.deploy.history.HistoryServer-1-d5dfa4949b86.out
Spark Command: /usr/local/openjdk-8/bin/java -cp /spark/conf/:/spark/jars/*
-Xmx1g org.apache.spark.deploy.history.HistoryServer
========================================
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/09/08 14:25:33 INFO HistoryServer: Started daemon with process name:
14@d5dfa4949b86
19/09/08 14:25:33 INFO SignalUtils: Registered signal handler for TERM
19/09/08 14:25:33 INFO SignalUtils: Registered signal handler for HUP
19/09/08 14:25:33 INFO SignalUtils: Registered signal handler for INT
19/09/08 14:25:34 WARN NativeCodeLoader: Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
19/09/08 14:25:34 INFO SecurityManager: Changing view acls to: root
19/09/08 14:25:34 INFO SecurityManager: Changing modify acls to: root
19/09/08 14:25:34 INFO SecurityManager: Changing view acls groups to:
19/09/08 14:25:34 INFO SecurityManager: Changing modify acls groups to:
19/09/08 14:25:34 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(root); groups
with view permissions: Set(); users with modify permissions: Set(root);
groups with modify permissions: Set()
19/09/08 14:25:34 INFO FsHistoryProvider: History server ui acls disabled;
users with admin permissions: ; groups with admin permissions
19/09/08 14:25:35 INFO FsHistoryProvider:
Parsing file:/spark-events/local-1567952519905 for listing data...
19/09/08 14:25:35 INFO Utils: Successfully started service on port 18080.
19/09/08 14:25:35 INFO HistoryServer: Bound HistoryServer to 0.0.0.0,
and started at http://d5dfa4949b86:18080
19/09/08 14:25:36 INFO FsHistoryProvider: Finished parsing
file:/spark-events/local-1567952519905
Navigate to http://localhost:18080 to view detailed information about your jobs. After analysis you can shutdown the Spark history server using ctrl+C.
$ ^C
19/09/08 14:27:18 ERROR HistoryServer: RECEIVED SIGNAL INT
19/09/08 14:27:18 INFO ShutdownHookManager: Shutdown hook called
Be sure to explore the history server thoroughly! You can use it to gain an understanding of how Spark executes your application, as well as to debug and time your code, which is important for both lab 1 and 2.
In this lab, we will design and develop an application to process GDELT data.
For a given amount of segments, the application should output the 10 most
frequently mentioned topics contained in the AllNames
column. The application
will later be used in lab 2 to scale the analysis to the entire dataset.
We recommend you read all the relevant sections on Scala and Spark in the guide. Make sure you have Docker up-and-running and that you have built the required images for Spark and SBT, as per the instructions. You can verify your set-up by going through the steps of the Spark tutorial.
Download the template project from lab’s GitHub repository (contained in
the lab1/
folder), either by forking or cloning the repository, or
downloading a zip file. You should execute all Docker commands from this
project folder. Rename the Scala file and the class it contains to something
meaningful. Update the project name in build.sbt
. The project folder also
contains a data/
directory, which will contain the data you download for
testing, as explained in the following. The data/
folder is automatically
mounted in the working directory of relevant containers.
We will use the GDELT version 2 GKG files. As mentioned, these files are
formatted in tab-separated values. The schema is listed below. The columns that
are most relevant are DATE
and AllNames
.
To use the schema for Spark’s DataFrameReader
, first make sure to import the
required Spark SQL types:
import org.apache.spark.sql.types._
The schema can then be specified as a Spark StructType
, as listed below.
val schema = StructType(
Array(
StructField("GKGRECORDID", StringType),
StructField("DATE", DateType),
StructField("SourceCollectionIdentifier", IntegerType),
StructField("SourceCommonName", StringType),
StructField("DocumentIdentifier", StringType),
StructField("Counts", StringType),
StructField("V2Counts", StringType),
StructField("Themes", StringType),
StructField("V2Themes", StringType),
StructField("Locations",StringType),
StructField("V2Locations",StringType),
StructField("Persons",StringType),
StructField("V2Persons",StringType),
StructField("Organizations",StringType),
StructField("V2Organizations",StringType),
StructField("V2Tone", StringType),
StructField("Dates",StringType),
StructField("GCAM", StringType),
StructField("SharingImage", StringType),
StructField("RelatedImages",StringType),
StructField("SocialImageEmbeds",StringType),
StructField("SocialVideoEmbeds",StringType),
StructField("Quotations", StringType),
StructField("AllNames", StringType),
StructField("Amounts",StringType),
StructField("TranslationInfo",StringType),
StructField("Extras", StringType)
)
In the data/
folder you will also find a script called get_data
and its
Powershell equivalent get_data.ps1
, for use on Windows. This script will
download a number of GKG segments to your computer, and generate a file
local_index.txt
containing the paths to the downloaded files.
$ ./get_data 4
...
wget downloading
...
$ cat local_index.txt
/path/to/this/repo/lab1/data/segment/20150218230000.gkg.csv
/path/to/this/repo/lab1/data/segment/20150218231500.gkg.csv
/path/to/this/repo/lab1/data/segment/20150218233000.gkg.csv
/path/to/this/repo/lab1/data/segment/20150218234500.gkg.csv
The script will put all downloaded files in the segment
folder. wget
adds timestamps to downloads, so files will not be needlessly downloaded again.
You can use these local files for the first lab assignment to test your application, and build some understanding of the scaling behaviour on a single machine.
Your program should output a structure that maps dates to a list of 10 tuples containing the most mentioned topics and the amount of times they were mentioned. An example output of this system based on 10 segments would be:
DateResult(2015-02-19,List((United States,1497), (Islamic State,1233), (New
York,1058), (United Kingdom,735), (White House,723), (Los Angeles,620), (New
Zealand,590), (Associated Press,498), (San Francisco,479), (Practice Wrestling
Room,420)))
DateResult(2015-02-18,List((Islamic State,1787), (United States,1210), (New
York,727), (White House,489), (Los Angeles,424), (Associated Press,385), (New
Zealand,353), (United Kingdom,325), (Jeb Bush,298), (Practice Wrestling
Room,280)))
Or in JSON:
{"data":"2015-02-19","result":[{"topic":"United
States","count":1497},{"topic":"Islamic State","count":1233},{"topic":"New
York","count":1058},{"topic":"United Kingdom","count":735},{"topic":"White
House","count":723},{"topic":"Los Angeles","count":620},{"topic":"New
Zealand","count":590},{"topic":"Associated Press","count":498},{"topic":"San
Francisco","count":479},{"topic":"Practice Wrestling Room","count":420}]}
{"data":"2015-02-18","result":[{"topic":"Islamic
State","count":1787},{"topic":"United States","count":1210},{"topic":"New
York","count":727},{"topic":"White House","count":489},{"topic":"Los
Angeles","count":424},{"topic":"Associated Press","count":385},{"topic":"New
Zealand","count":353},{"topic":"United Kingdom","count":325},{"topic":"Jeb
Bush","count":298},{"topic":"Practice Wrestling Room","count":280}]}
The exact counts can vary depending on how you count, for instance if a name is mentioned multiple times per article, do you count it once or multiple times? Something in between? Do you filter out some names that are false positives (“ParentCategory” seems to be a particular common one)? You are free to implement it whatever you think is best, and are encouraged to experiment with this. Document your choices in your report.
The deliverables for the first lab are:
- A Dataframe/Dataset-based implementation of the GDELT analysis,
- An RDD-based implementation of the GDELT analysis,
- A report containing:
- Outline of your implementation and approach (½–1 page);
- Answers to the questions listed below. Be as concise as you can!
For the implementations, only hand in your Scala files. Your code should run without changes in the rest of the project. Your submission should contain 2 Scala files and 1 PDF file containing your report.
The deadline of this lab will be announced on Brightspace. Your report and code will be discussed in a brief oral examination during the lab, the schedule of which will become available later.
General questions:
-
In typical use, what kind of operation would be more expensive, a narrow dependency or a wide dependency? Why? (max. 50 words)
-
What is the shuffle operation and why is it such an important topic in Spark optimization? (max. 100 words)
-
In what way can Dataframes and Datasets improve performance both in compute, but also in the distributing of data compared to RDDs? Under what conditions will Dataframes perform better than RDDs? (max. 100 words)
-
Consider the following scenario. You are running a Spark program on a big data cluster with 10 worker nodes and a single master node. One of the worker nodes fails. In what way does Spark’s programming model help you recover the lost work? (Think about the execution plan or DAG) (max. 50 words)
-
We might distinguish the following five conceptual parallel programming models:
- farmer/worker
- divide and conquer
- data parallelism
- function parallelism
- bulk-synchronous
Pick one of these models and explain why it does or does not fit Spark’s programming model. (max. 100 words)
Implementation analysis questions:
- Measure the execution times for 10, 20, 50, 100 and 150 segments. Do you observe a difference in execution time between your Dataframe and RDD implementations? Is this what you expect and why? (max. 50 words)
- How will your application scale when increasing the amount of analyzed segments? What do you expect the progression in execution time will be for, 100, 1000, 10000 segments? (max. 50 words)
- If you extrapolate the scaling behavior on your machine, using your results from question 1, to the entire dataset, how much time will it take to process the entire dataset? Is this extrapolation reasonable for a single machine? (max. 50 words)
- Now suppose you had a cluster of identical machines with that you performed the analysis on. How many machines do you think you would need to process the entire dataset in under an hour? Do you think this is a valid extrapolation? (max. 50 words)
- Suppose you would run this analysis for a company. What do you think would be an appropriate way to measure the performance? Would it be the time it takes to execute? The amount of money it takes to perform the analysis on the cluster? A combination of these two, or something else? Pick something you think would be an interesting metric, as this is the metric you will be optimizing in the 2nd lab! (max. 100 words)
In the first lab you built an application for a small dataset to analyze the most common topics in the news according to the GDelt dataset. In this lab we will scale this application using Amazon Web Services to process the entire dataset (several terabytes). You are free to pick either your RDD, or Dataframe/Dataset implementation.
We assume everybody has access to AWS credits via both the GitHub developer pack and the AWS classroom in their lab group! If this is not the case, please ask for help, or send us an email.
You pay for cluster per commissioned minute. After you are done working with a cluster, please terminate the cluster, to avoid unnecessary costs.
Like last lab, we will first give you a short description of the different technologies you will be using before we give the actual assignment.
AWS consists of a variety of different services, the ones relevant for this lab are listed below:
- EC2
Elastic Compute Cloud allows you to provision a variety of different machines that can be used to run a computation. An overview of the different machines and their use cases can be found on the EC2 website. - EMR
Elastic MapReduce is a layer on top of EC2, that allows you to quickly deploy MapReduce-like applications, for instance Apache Spark. - S3
Simple Storage Server is an object based storage system that is easy to interact with from different AWS services.
Note that the GDelt GKG is hosted on AWS S3 in the US east region, so any EC2/EMR instances interacting with this data set should also be provisioned there. At the time of writing, this means that you should select either the Virginia or Ohio region for your instances.
AWS EC2 offers spot instances, a marketplace for unused machines that you can bid on. These spot instances are often a order of magnitude cheaper than on-demand instances. The current price list can be found in the EC2 website. We recommend using spot instances for the entirety of this lab.
We will be using the AWS infrastructure to run the application. Log in to the AWS console, and open the S3 interface. Create a bucket where we can store the application JAR, and all the other files needed by your application.
There are (at least) two ways to transfer files to S3:
- The web interface, and
- The command line interface.
The web interface is straightforward to use. To use the command line interface, first install the AWS CLI. Some example operations are listed below.
To copy a file
aws s3 cp path/to/file s3://destination-bucket/path/to/file
To copy a directory recursively
aws s3 cp --recursive s3://origin-bucket/path/to/file
To move a file
aws s3 mv path/to/file s3://destination-bucket/path/to/file
The aws-cli contains much more functionality, which can be found on the AWS-CLI docs.
Once you have uploaded all the necessary files (again your application JAR, and all the files required by the application).
We are now ready to provision a cluster. Go to the EMR service, and select Create Cluster. Next select Go to advanced options, select the latest release, and check the frameworks you want to use. In this case this means Spark, Hadoop and Ganglia. Spark and Hadoop you already know, we will introduce Ganglia later in this chapter.
EMR works with steps, which can be thought of as a job, or the execution of a single application. You can choose to add steps in the creation of the cluster, but this can also be done at a later time. Press next.
In the Hardware Configuration screen, we can configure the arrangement and
selection of the machines. We suggest starting out with m4.large
machines on
spot pricing. You should be fine running a small example workload with a single
master node and two core nodes.[3][4] Be sure to select spot pricing and
place an appropriate bid. Remember that you can always check the current prices
in the information popup or on the Amazon website. After
selecting the machines, press next.
In the General Options you can select a cluster name. You can tune where the system logs and a number of other features (more information in the popups). After finishing this step, press next.
You should now arrive in the Security Options screen. If you have not created a EC2 keypair, it is recommended that you do so now. This will allow you to access the Yarn, Spark, and Ganglia web interfaces in your browser. This makes debugging and monitoring the execution of your Spark Job much more manageable. To create a EC2 keypair, follow these instructions.
After this has all been completed you are ready to spin up your first cluster by pressing Create cluster. Once the cluster has been created, AWS will start provisioning machines. This should take about 10 minutes. In the meantime you can add a step. Go the Steps foldout, and select Spark application for Step Type. Clicking on Configure will open a dialogue in which you can select the application JAR location in your S3 bucket, as well as any number of argument to the application, spark-submit, as well as your action on failure.
Make sure you do not try to process the entire dataset in your initial run, but, similar to lab 1, start with a few files, to confirm that the application works as intended
The setup will take some time to finish, so in the meantime you should configure a proxy for the web interfaces. More detailed information can be found on the AWS website. You can check the logs in your S3 bucket, or the web interfaces to track the progress of your application and whether any errors have occurred.
By forwarding the web interfaces you will also have access to Apache Ganglia. Ganglia is a tool that allows you to monitor your cluster for incoming and outgoing network, CPU load, memory pressure, and other useful metrics. They can help to characterize the workload at hand, and help optimizing computation times. An example of its interface is shown in fig. 2.
It’s not uncommon to run into problems when you first deploy your application to AWS, here are some general clues:
-
You can access S3 files directly using Spark, so via
SparkContext.textFile
andSparkSession.read.csv
, but not using the OS, so using an ordinaryFile
java class will not work. If you want to load a file to the environment, you will have to figure out a workaround. -
You can monitor the (log) output of your master and worker nodes in Yarn, which you can access in the web interfaces. It might help you to insert some helpful logging messages in your Application.
-
Scale your application by increasing the workload by an order of magnitude at a time, some bugs only become apparent when you have a sufficient load on your cluster and a sufficient cluster size. In terms of cost, it’s also much cheaper if you do your debugging incrementally on smaller clusters.
-
Ensure that your cluster is running in actual cluster mode (can be visually confirmed by checking the load on the non-master nodes in Ganglia).
For this lab, we would like you to process the entire dataset, meaning all
segments, with 20 c4.8xlarge
core nodes, in under half an hour, using your
solution from lab 1. This should cost you less than 12 dollars and is the
minimum requirement.
Note that this means that you should evaluate whether you application scales well enough to achieve this before attempting to run it on the entire dataset. Your answers to the questions in lab 1 should help you to determine this, but to reiterate: consider e.g. how long it will take to run
- 1000 segments compared to 10000 segments?
- on 4 virtual cores compared to 8, and what about 32?
If your application is not efficient enough right away, you should analyze its bottlenecks and modify it accordingly, or try to gain some extra performance by modifying the way Spark is configured. You can try a couple of runs on the entire dataset when you have a good understanding of what might happen on the entire dataset.
For extra points, we challenge you to come up with an even better solution according to the metric you defined in lab 1. You are free to change anything, but some suggestions are:
- Find additional bottlenecks using Apache Ganglia (need more network I/O, or more CPU?, more memory?)
- Tuning the kind and number of machines you use on AWS, based on these bottlenecks
- Modifying the application to increase performance
- Tuning Yarn/Spark configuration flags to best match the problem
There is a guide to Spark performance tuning on the Spark website.
- A report outlining your choices in terms of configuration and your results.
- A presentation (maximum 5 slides/minutes) in which you present your work and results to the class. Try to put an emphasis on the improvements you found, what kind of settings/configurations/changes had the most impact.
In the report, there should be a justification for why you chose the cluster configuration you did. If you have measurements for multiple cluster configurations please include them. Also detail all the improvements you found, and why they improved effectiveness.
In the third and final lab of SBD we will be implementing a streaming application. As many of you have noted in the first lab questions, Spark is not well suited for real-time streaming, because of its batch-processing nature. Therefore, we will be using Apache Kafka for this lab. You will be provided with a Kafka stream of GDELT records, for which we want you to create a histogram of the most popular topics of the last hour that will continuously update. We included another visualizer for this lab that you can see in fig. 3.
Apache Kafka is a distributed streaming platform. The core abstraction is that of a message queue, to which you can both publish and subscribe to streams of records. Each queue is named by means of a topic. Apache Kafka is:
- Resilient by means of replication;
- Scalable on a cluster;
- High-throughput and low-latency; and
- A persistent store.
Kafka consists of 4 APIs, from the Kafka docs:
- The Producer API
allows an application to publish a stream of records to one or more Kafka topics. - The Consumer API
allows an application to subscribe to one or more topics and process the stream of records produced to them. - The Streams API
allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams. - The Connector API
allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.
Before you start with the lab, please read the Introduction to Kafka on the Kafka website, to become familiar with the Apache Kafka abstraction and internals. You can find instructions on how to install Kafka on your machine here. A good introduction to the Kafka stream API can be found here. We recommend you go through the code and examples.
We will again be using Scala for this assignment. Although Kafka’s API is
completely written in Java, the streams API has been wrapped in a Scala API for
convenience. You can find the Scala KStreams documentation
here,
for API docs on the different parts of Kafka, like StateStores
, please refer
to this link.
In the lab’s repository you will find a template for your solution. There are a
bunch of scripts (.sh
for MacOS/Linux, .bat
for Windows). For these scripts
to work you first will have to define a KAFKA_HOME
environment variable to
the root of the Kafka installation directory. The Kafka installation directory
should contain the following directories:
$ tree .
├── bin
│ └── windows
├── config
├── libs
└── site-docs
Once that has been set up, copy the lab files from the GitHub repository. Try
to run the kafka_start.sh
or kafka_start.bat
depending on your OS. If you
receive an error about being unable to find a java
binary, make sure you have
Java installed and it is in your path.
The kafka_start
script does a number of things:
- Start a Zookeeper server, which acts as a naming, configuration and task coordination server, on port 2181
- Start a single Kafka broker on port 9092
Navigate to the GDELTProducer directory, and run sbt run
to
start[5] the GDELT stream.
We can now inspect the output of the gdelt
topic by running the following
command on MacOS/Linux:
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic gdelt --property print.key=true --property key.separator=-
Or on Windows PowerShell:
$env:KAFKA_HOME\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 `
--topic gdelt --property print.key=true --property key.separator=-
Or on Windows cmd:
%KAFKA_HOME%\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 ^
--topic gdelt --property print.key=true --property key.separator=-
If you see output appearing, you are now ready to start on the assignment.
As mentioned before, for this assignment, we will no longer batch process the GDELT Global Knowledge Graph, but rather stream it into a pipeline that computes a histogram of the last hour. This pipeline is depicted by fig. 4. We will give a small description of the individual parts below.
- Producer
The producer, contained in theGDELTProducer
Scala project, starts by downloading all segments of the previous hour (minus a 15 minute offset), and immediately start streaming records (rows) to a Kafka topic calledgdelt
. Simultaneously, it will schedule a new download step at the next quarter of the hour. The frequency by which the records are streamed is determined as the current amount of queued records over the time left until new data is downloaded from S3. - Transformer
The transformer receives GDELT records on thegdelt
topic and should use them to construct a histogram of the names from the “allNames” column of the dataset, but only for the last hour. This is very similar to the application you wrote in Lab 1, but it happens in real-time and you should take care to also decrement/remove names that are older than an hour (relative to your input data). Finally, the transformer’s output should appear on a Kafka topic calledgdelt-histogram
. - Consumer
The consumer finally acts as a sink, and will process the incoming histogram updates from the transformer into a smaller histogram of only the 100 most occurring names for display [6]. It will finally stream this histogram to our visualizer over a WebSocket connection.
You are now tasked with writing an implementation of the histogram transformer.
In the file GDELTStream/GDELTStream.scala
you will have to implement the
following
- GDELT row processing
In the main function you will first have to write a function that filters the GDELT lines to a stream of allNames column. You can achieve this using the high-level API of Kafka Streams, on theKStream
object. - HistogramTransformer
You will have to implement theHistogramTransformer
using the processor/transformer API of kafka streams, to convert the stream of allNames into a histogram of the last hour. We suggest you look at state store for Kafka streaming.
You will have to write the result of this stream to a new topic called
gdelt-histogram
.
This stream should consist of records (key-value pairs) of the form (name, count)
and type (String, Long)
, where the value of name was extracted from
the “allNames” column.
This means that whenever the transformer reads a name from the “allNames” column, it should publish an updated, i.e. incremented, (name, count) pair on the output topic, so that the visualizer can update accordingly. When you decrement a name, because its occurrence is older than an hour, remember to publish an update as well!
To run the visualizer, first start the websocket server by navigating to the
GDELTConsumer directory and running sbt run
. Next, navigate to the
visualization directory in the root of the GitHub repository, under assignment
3, open index.html. Once that is opened, press open web socket to start
the visualization.
- A complete zip of the entire project, including your implementation of
GDELTStream.scala
(please remove all data files from the zip!) - A report containing
- Outline of the code (less than 1/2 a page)
- Answers to the questions listed below
Try to be concise with your answers. Some questions have a maximum number of words you can use, but you are welcome to use fewer if you can.
- What is the difference, in terms of data processing, between Kafka and Spark?
- What is the difference between replications and partitions?
- What is Zookeeper’s role in the Kafka cluster? Why do we need a separate entity for this? (max. 50 words)
- Why does Kafka by default not guarantee exactly once delivery semantics on producers? (max. 100 words)
- Kafka is a binary protocol (with a reference implementation in Java), whereas Spark is a framework. Name two (of the many) advantages of Kafka being a binary protocol in the context of Big Data. (max. 100 words)
- On average, how many bytes per second does the stream transformer have to consume? How many does it produce?
- Could you use a Java/Scala data structure instead of a Kafka State Store to manage your state in a processor/transformer? Why, or why not? (max. 50 words)
- Given that the histogram is stored in a Kafka
StateStore
, how would you extract the top 100 topics? Is this efficient? (max. 75 words) - The visualizer draws the histogram in your web browser. A Kafka consumer has to communicate the current ‘state’ of the histogram to this visualizer. What do you think is an efficient way of streaming the ‘state’ of the histogram to the webserver? (max. 75 words)
- What are the two ways you can scale your Kafka implementation over multiple nodes? (max. 100 words)
- How could you use Kafka’s partitioning to compute the histogram in parallel? (max. 100 words)
-
Normally the image is pulled from Docker Hub, but the tag with the required version of Scala for Spark is no longer available.
-
Since Java 8, Java also supports anonymous functions, or lambda expression, but this version wasn’t released at the time of Spark’s initial release.
-
You always need a master node, which is tasked with distributing resources and managing tasks for the core nodes. We recommend using the cheap
m4.large
instance. If you start to notice unexplained bottlenecks for tasks with many machines and a lot of data, you might want to try a larger master node. Ganglia should provide you with some insights regarding this matter. -
By default, there are some limitations on the number of spot instances your account is allowed to provision. If you don’t have access to enough spot instances, the procedure to request additional can be found in the AWS documentation.
-
As this issue suggests, you might need to run
sbt run
twice when starting the producer for the first time. -
It might turn out that this is too much for your browser to handle. If this is the case, you may change it manually in the
HistogramProcessor
contained inGDELTConsumer.scala
.