-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathSpark_stream_demo.py
112 lines (99 loc) · 3.42 KB
/
Spark_stream_demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
############################################################################################
REF 1) : Streaming basics
https://github.com/clumdee/Python-and-Spark-for-Big-Data-master/blob/master/Spark_Streaming/streaming_terminal_with_RDD.ipynb
SPARK STREAM DEMO CODE
REF 2) : temp SQL table in Streaming window
https://spark.apache.org/docs/1.6.2/streaming-programming-guide.html
****************** QUICK START ******************
STEP 1) open one terminal run :
$ nc -lk 9999 ( start the localhost:9999, Start the streaming session to the localhost)
and type sth within that terminal
i.e.
hello world
hello world
lol
lo
l
.
.
STEP 2) open the other terminal and run
$ spark-submit Spark_stream_demo.py
you should seed the spark stream get the session input and response as below
-------------------------------------------
Time: 2018-12-29 09:44:30
-------------------------------------------
('', 4)
('l', 2)
('lo', 2)
('world', 4)
('ll', 1)
('hello', 4)
('lol', 4)
========= 2018-12-29 09:44:30 =========
+-----+-----+
word|total|
+-----+-----+
| l| 2|
| lol| 4|
|hello| 4|
| ll| 1|
|world| 4|
| | 4|
| lo| 2|
+-----+-----+
############################################################################################
"""
from pyspark.streaming import StreamingContext
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
# help func
# Lazily instantiated global instance of SQLContext
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']
def process(time, rdd):
print("========= %s =========" % str(time))
try:
# Get the singleton instance of SQLContext
sqlContext = getSqlContextInstance(rdd.context)
# Convert RDD[String] to RDD[Row] to DataFrame
rowRdd = rdd.map(lambda w: Row(word=w))
wordsDataFrame = sqlContext.createDataFrame(rowRdd)
# Register as table
wordsDataFrame.registerTempTable("words")
# Do word count on table using SQL and print it
wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
except:
pass
if __name__ == '__main__':
# PART 1) : BASIC STREAMING
# CONFIG
# create a SparkContext object with 2 local threads
# name it as "NetworkWordCount"
sc = SparkContext('local[2]', 'NetworkWordCount')
print (sc)
# pass a SparkContect to a StreamingContext object
# with batch duration = e.g. 10s
ssc = StreamingContext(sc, 10)
# set where the data streaming will come from e.g. localhost:9999
lines = ssc.socketTextStream('localhost', 9999)
# split the 'lines' with a whitespace into a list of words
words = lines.flatMap(lambda line: line.split(' '))
# create a tuple of each word and 1 using 'map'
# e.g. word_0 --> (word_0, 1)
pairs = words.map(lambda word: (word, 1))
# count the words using reduceByKey e.g. by 'word_0', 'word_1'
word_counts = pairs.reduceByKey(lambda num1, num2: num1 + num2)
# print elements of the RDD
word_counts.pprint()
print (word_counts)
# in case just run on PART 1)
#ssc.start()
#ssc.awaitTermination() # Wait for the computation to terminate
# PART 2) SPARK SQL IN STREAMING WINDOW
words.foreachRDD(process)
ssc.start()
ssc.awaitTermination() # Wait for the computation to terminate