diff --git a/src/goodreads_driver.py b/src/goodreads_driver.py index d2d23d1..7a5b669 100644 --- a/src/goodreads_driver.py +++ b/src/goodreads_driver.py @@ -8,6 +8,7 @@ from warehouse.goodreads_warehouse_driver import GoodReadsWarehouseDriver import time +# Setting configurations. Look config.cfg for more details config = configparser.ConfigParser() config.read_file(open(f"{Path(__file__).parents[0]}/config.cfg")) @@ -18,6 +19,9 @@ def create_sparksession(): + """ + Initialize a spark session + """ return SparkSession.builder.master('yarn').appName("goodreads") \ .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.2") \ @@ -25,6 +29,12 @@ def create_sparksession(): def main(): + """ + This method performs below tasks: + 1: Check for data in Landing Zone, if new files are present move them to Working Zone + 2: Transform data present in working zone and save the transformed data to Processed Zone + 3: Run Data Warehouse functionality by setting up Staging tables, then loading staging tables, performing upsert operations on warehouse. + """ logging.debug("\n\nSetting up Spark Session...") spark = create_sparksession() grt = GoodreadsTransform(spark) @@ -66,5 +76,7 @@ def main(): logging.debug("Performing UPSERT") grwarehouse.perform_upsert() + +# Entry point for the pipeline if __name__ == "__main__": main() diff --git a/src/goodreads_transform.py b/src/goodreads_transform.py index 6d6a946..25ff083 100644 --- a/src/goodreads_transform.py +++ b/src/goodreads_transform.py @@ -11,6 +11,13 @@ config.read_file(open(f"{Path(__file__).parents[0]}/config.cfg")) class GoodreadsTransform: + """ + This class performs transformation operations on the dataset. + 1. Transform timestamp format, clean text part, remove extra spaces etc. + 2. Create a lookup dataframe which contains the id and the timestamp for the latest record. + 3. Join this lookup data frame with original dataframe to get only the latest records from the dataset. + 4. Save the dataset by repartitioning. Using gzip compression + """ def __init__(self, spark): self._spark = spark diff --git a/src/goodreads_udf.py b/src/goodreads_udf.py index 28f2219..a7ba108 100644 --- a/src/goodreads_udf.py +++ b/src/goodreads_udf.py @@ -3,8 +3,10 @@ from time import strptime from datetime import datetime +# A Simple udf to remove extra spaces from text remove_extra_spaces = udf(lambda x: ' '.join(x.split()) , StringType()) +# A udf to transform timestamp format @udf(TimestampType()) def stringtodatetime(datestring): x = datestring.split() diff --git a/src/s3_module.py b/src/s3_module.py index c6aa195..c8e0f7c 100644 --- a/src/s3_module.py +++ b/src/s3_module.py @@ -19,6 +19,13 @@ def __init__(self): self._processed_zone = config.get('BUCKET','PROCESSED_ZONE') def s3_move_data(self, source_bucket = None, target_bucket= None): + """ + Detect files in source bucket and move those files to target bucket + :param source_bucket: name of source bucket + :param target_bucket: name of target bucket + """ + + # If no argument passed default to the project related landing zone and working zone if source_bucket is None: source_bucket = self._landing_zone if target_bucket is None: @@ -35,13 +42,24 @@ def s3_move_data(self, source_bucket = None, target_bucket= None): logging.debug(f"Copying file {key} from {source_bucket} to {target_bucket}") self._s3.meta.client.copy({'Bucket': source_bucket, 'Key': key}, target_bucket, key) - # cleanup source bucket + # cleanup source bucket, + # Cleaning bucket part is commented to avoid uploading files to s3 again and again when testing heavy loads on ETL. + #self.clean_bucket(source_bucket) def get_files(self, bucket_name): + """ + Get all the files present in the provided bucket + :param bucket_name: bucket to search + :return: keys or files present in the bucket + """ logging.debug(f"Inspecting bucket : {bucket_name} for files present") return [bucket_object.key for bucket_object in self._s3.Bucket(bucket_name).objects.all()] def clean_bucket(self, bucket_name): + """ + Clean the bucket, delete all files + :param bucket_name: bucket name, bucket to clean + """ logging.debug(f"Cleaning bucket : {bucket_name}") self._s3.Bucket(bucket_name).objects.all().delete()