-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathSpark_load_s3_file_V2.py
59 lines (45 loc) · 1.74 KB
/
Spark_load_s3_file_V2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline
LINE_LENGTH = 200
def print_horizontal():
"""
Simple method to print horizontal line
:return: None
"""
for i in range(LINE_LENGTH):
sys.stdout.write('-')
print("")
try:
from pyspark import SparkContext
from pyspark import SQLContext
print ("Successfully imported Spark Modules -- `SparkContext, SQLContext`")
print_horizontal()
except ImportError as e:
print ("Can not import Spark Modules", e)
sys.exit(1)
conf = SparkConf().setAppName("load s3 bucket file")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sparkContext=sc)
# Loads parquet file located in AWS S3 into RDD Data Frame
parquetFile = sqlContext.read.parquet("s3://jon-parquet-format/nation.plain.parquet")
# Stores the DataFrame into an "in-memory temporary table"
parquetFile.registerTempTable("parquetFile")
# Run standard SQL queries against temporary table
nations_all_sql = sqlContext.sql("SELECT * FROM parquetFile")
# Print the result set
nations_all = nations_all_sql.map(lambda p: "Country: {0:15} Ipsum Comment: {1}".format(p.name, p.comment_col))
print("All Nations and Comments -- `SELECT * FROM parquetFile`")
print_horizontal()
for nation in nations_all.collect():
print(nation)
# Use standard SQL to filter
nations_filtered_sql = sqlContext.sql("SELECT name FROM parquetFile WHERE name LIKE '%UNITED%'")
# Print the result set
nations_filtered = nations_filtered_sql.map(lambda p: "Country: {0:20}".format(p.name))
print_horizontal()
print("Nations Filtered -- `SELECT name FROM parquetFile WHERE name LIKE '%UNITED%'`")
print_horizontal()
for nation in nations_filtered.collect():
print(nation)