Skip to content

Commit

Permalink
Improving code documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
san089 committed Feb 28, 2020
1 parent 3d45632 commit 187bf93
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 1 deletion.
12 changes: 12 additions & 0 deletions src/goodreads_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from warehouse.goodreads_warehouse_driver import GoodReadsWarehouseDriver
import time

# Setting configurations. Look config.cfg for more details
config = configparser.ConfigParser()
config.read_file(open(f"{Path(__file__).parents[0]}/config.cfg"))

Expand All @@ -18,13 +19,22 @@


def create_sparksession():
"""
Initialize a spark session
"""
return SparkSession.builder.master('yarn').appName("goodreads") \
.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.2") \
.enableHiveSupport().getOrCreate()


def main():
"""
This method performs below tasks:
1: Check for data in Landing Zone, if new files are present move them to Working Zone
2: Transform data present in working zone and save the transformed data to Processed Zone
3: Run Data Warehouse functionality by setting up Staging tables, then loading staging tables, performing upsert operations on warehouse.
"""
logging.debug("\n\nSetting up Spark Session...")
spark = create_sparksession()
grt = GoodreadsTransform(spark)
Expand Down Expand Up @@ -66,5 +76,7 @@ def main():
logging.debug("Performing UPSERT")
grwarehouse.perform_upsert()


# Entry point for the pipeline
if __name__ == "__main__":
main()
7 changes: 7 additions & 0 deletions src/goodreads_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
config.read_file(open(f"{Path(__file__).parents[0]}/config.cfg"))

class GoodreadsTransform:
"""
This class performs transformation operations on the dataset.
1. Transform timestamp format, clean text part, remove extra spaces etc.
2. Create a lookup dataframe which contains the id and the timestamp for the latest record.
3. Join this lookup data frame with original dataframe to get only the latest records from the dataset.
4. Save the dataset by repartitioning. Using gzip compression
"""

def __init__(self, spark):
self._spark = spark
Expand Down
2 changes: 2 additions & 0 deletions src/goodreads_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from time import strptime
from datetime import datetime

# A Simple udf to remove extra spaces from text
remove_extra_spaces = udf(lambda x: ' '.join(x.split()) , StringType())

# A udf to transform timestamp format
@udf(TimestampType())
def stringtodatetime(datestring):
x = datestring.split()
Expand Down
20 changes: 19 additions & 1 deletion src/s3_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ def __init__(self):
self._processed_zone = config.get('BUCKET','PROCESSED_ZONE')

def s3_move_data(self, source_bucket = None, target_bucket= None):
"""
Detect files in source bucket and move those files to target bucket
:param source_bucket: name of source bucket
:param target_bucket: name of target bucket
"""

# If no argument passed default to the project related landing zone and working zone
if source_bucket is None:
source_bucket = self._landing_zone
if target_bucket is None:
Expand All @@ -35,13 +42,24 @@ def s3_move_data(self, source_bucket = None, target_bucket= None):
logging.debug(f"Copying file {key} from {source_bucket} to {target_bucket}")
self._s3.meta.client.copy({'Bucket': source_bucket, 'Key': key}, target_bucket, key)

# cleanup source bucket
# cleanup source bucket,
# Cleaning bucket part is commented to avoid uploading files to s3 again and again when testing heavy loads on ETL.

#self.clean_bucket(source_bucket)

def get_files(self, bucket_name):
"""
Get all the files present in the provided bucket
:param bucket_name: bucket to search
:return: keys or files present in the bucket
"""
logging.debug(f"Inspecting bucket : {bucket_name} for files present")
return [bucket_object.key for bucket_object in self._s3.Bucket(bucket_name).objects.all()]

def clean_bucket(self, bucket_name):
"""
Clean the bucket, delete all files
:param bucket_name: bucket name, bucket to clean
"""
logging.debug(f"Cleaning bucket : {bucket_name}")
self._s3.Bucket(bucket_name).objects.all().delete()

0 comments on commit 187bf93

Please sign in to comment.