Skip to content

Commit

Permalink
Add data path helper to ease execution in gcp dataproc
Browse files Browse the repository at this point in the history
  • Loading branch information
luisbelloch committed Feb 7, 2019
1 parent 379c314 commit 795d4a9
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
9 changes: 5 additions & 4 deletions spark/compras_top_ten_countries.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import sys
from pyspark import SparkContext
from helpers import item_fields, parse_item
from helpers import dataUrl, item_fields, parse_item

sc = SparkContext('local', 'compras')
txt = sc.textFile('data/compras_tiny.csv')
txt = sc.textFile(dataUrl('compras_tiny.csv'))
no_header = txt.filter(lambda s: not s.startswith(item_fields[0]))
parsed = no_header.map(lambda s: parse_item(s)).cache()

countries_rdd = sc \
.textFile('./data/country_codes.csv') \
.textFile(dataUrl('country_codes.csv')) \
.map(lambda c: tuple(reversed(c.split(','))))

join_rdd = parsed \
Expand All @@ -20,5 +21,5 @@
print(join_rdd.take(10))

# print map(lambda i: (i[0], i[1][1], i[1][0]), join_rdd.take(10))
# join_rdd.saveAsTextFile('./top10countries', 'org.apache.hadoop.io.compress.GzipCodec')
# join_rdd.saveAsTextFile(dataUrl('out/top10countries'), 'org.apache.hadoop.io.compress.GzipCodec')

5 changes: 5 additions & 0 deletions spark/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,8 @@ def isoDate(raw_string):
except Exception:
return None

def dataUrl(fileName):
base = "./data"
# base = "gs://bigdataupv_data"
return os.path.join(base, fileName)

0 comments on commit 795d4a9

Please sign in to comment.