diff --git a/spark/compras_top_ten_countries.py b/spark/compras_top_ten_countries.py index e5b8312..e9c99f4 100644 --- a/spark/compras_top_ten_countries.py +++ b/spark/compras_top_ten_countries.py @@ -1,13 +1,14 @@ +import sys from pyspark import SparkContext -from helpers import item_fields, parse_item +from helpers import dataUrl, item_fields, parse_item sc = SparkContext('local', 'compras') -txt = sc.textFile('data/compras_tiny.csv') +txt = sc.textFile(dataUrl('compras_tiny.csv')) no_header = txt.filter(lambda s: not s.startswith(item_fields[0])) parsed = no_header.map(lambda s: parse_item(s)).cache() countries_rdd = sc \ - .textFile('./data/country_codes.csv') \ + .textFile(dataUrl('country_codes.csv')) \ .map(lambda c: tuple(reversed(c.split(',')))) join_rdd = parsed \ @@ -20,5 +21,5 @@ print(join_rdd.take(10)) # print map(lambda i: (i[0], i[1][1], i[1][0]), join_rdd.take(10)) -# join_rdd.saveAsTextFile('./top10countries', 'org.apache.hadoop.io.compress.GzipCodec') +# join_rdd.saveAsTextFile(dataUrl('out/top10countries'), 'org.apache.hadoop.io.compress.GzipCodec') diff --git a/spark/helpers.py b/spark/helpers.py index 5ca839a..4fbc790 100644 --- a/spark/helpers.py +++ b/spark/helpers.py @@ -52,3 +52,8 @@ def isoDate(raw_string): except Exception: return None +def dataUrl(fileName): + base = "./data" + # base = "gs://bigdataupv_data" + return os.path.join(base, fileName) +