Skip to content

Commit

Permalink
Adding faker module
Browse files Browse the repository at this point in the history
  • Loading branch information
san089 committed Feb 17, 2020
1 parent 5f791cf commit cd3fdb5
Show file tree
Hide file tree
Showing 11 changed files with 250 additions and 97 deletions.
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#Ignore Config Files
src/config.cfg
src/warehouse/warehouse_config.cfg

#Ignore Idea Files
.idea/

#Ignore Cache Files
src/warehouse/__pycache__/
53 changes: 53 additions & 0 deletions Utility/bootstrap_script.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
pip-3.6 install boto3 --user

sudo pip install -U \
awscli \
boto \
ciso8601 \
ujson \
workalendar


export PYSPARK_DRIVER_PYTHON=python3
export PYSPARK_PYTHON=python3


# Make all the objects of the bucket public
{
"Id": "...",
"Statement": [ {
"Sid": "...",
"Action": [
"s3:GetObject"
],
"Effect": "Allow",
"Resource": "arn:aws:s3:::bucket/*",
"Principal": {
"AWS": [ "*" ]
}
} ]
}


# If Redshift not able to access s3 public buckets, try Enabling Enhanced VPC routing
Go to Redshift cluster -> Network and security -> EnhancedVPC routing -> Enable it.


# Installing psycopg2

First need to install : postgresql-libs, postgresql-devel
Both are dependency for psycopg2

sudo yum install postgresql-libs
sudo yum install postgresql-devel

Then run :
sudo pip install psycopg2
or try
sudo pip-3.6 install psycopg2






19 changes: 15 additions & 4 deletions goodreadsfaker/generate_fake_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
from datetime import datetime
import os
import csv
import argparse

class GoodreadsFake:

def __init__(self):
self._faker = Faker(['it_IT', 'en_US', 'ja_JP', 'hi_IN'])
self._faker = Faker(['it_IT', 'en_US', 'hi_IN', 'ja_JP'])
self._fake_books = ["Vacation People","Enter the Aardvark","Murder Makes Scents","A Blink of the Screen: Collected Shorter Fiction","Living Your Dreams: How to make a living doing what you love",
"The Bedroom Experiment(Hot Jocks #5.5)","Sweet Soul (Sweet Home, #4; Carillo Boys, #3)","Would Like to Meet","House of Earth and Blood","Ugly Betty: The Book",
"The Favorite Daughter","The East End","Jinn'sDominion (Desert Cursed, #3","Pine & Boof: The Lucky Leaf","Beyond Belief: My Secret Life Inside Scientology and My Harrowing Escape",
Expand Down Expand Up @@ -42,12 +43,14 @@ def _write_to_disk(self, module_name, module_data):
if (len(module_data) > 0):
pd \
.DataFrame(module_data) \
.to_csv(path_or_buf=file, index=False, mode=write_mode, header=header, quoting=csv.QUOTE_MINIMAL)
.to_csv(path_or_buf=file, sep=',',index=False, mode=write_mode, header=header, quoting=csv.QUOTE_MINIMAL, encoding='utf-8')
self._user_data_list = list()
self._review_data_list = list()
self._author_data_list = list()
self._book_data_list = list()

def _clean_text(cls, text):
return ' '.join((text.replace('\n','')).split())

def _generate_fake_review_obj(self):
return {
Expand All @@ -57,7 +60,7 @@ def _generate_fake_review_obj(self):
"user_id" : self._faker.random_int(0, 100000),
"book_id" : self._faker.random_int(0, 100000),
"author_id" : self._faker.random_int(0, 100000),
"review_text" : self._faker.text(),
"review_text" : self._clean_text(self._faker.text()),
"review_rating" : self._faker.pyfloat(right_digits = 2, min_value =0, max_value = 5),
"review_votes" : self._faker.random_int(0, 1000000),
"spoiler_flag" : bool(self._faker.random_int(0, 1)),
Expand Down Expand Up @@ -187,6 +190,14 @@ def _parse_author_data(self, review_obj):


if __name__ == '__main__':

parser = argparse.ArgumentParser(
description="A fake data generator for GoodReads reviews.")

parser._action_groups.pop()
required = parser.add_argument_group('required arguments')
required.add_argument("-n", "--num_records", type=int, metavar='', required=True, help="Number of records to genertae.")
args = parser.parse_args()
fk = GoodreadsFake()
for i in range(100):
for i in range(args.num_records):
fk.generate()
1 change: 1 addition & 0 deletions src/goodreads.log
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2020-02-14 00:21:59,948 root DEBUG Hellow
49 changes: 38 additions & 11 deletions src/goodreads_transform.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
from pyspark.sql.types import StringType
from pyspark.sql import functions as fn
import goodreads_udf
import logging
import configparser
from pathlib import Path

logger = logging.getLogger(__name__)

config = configparser.ConfigParser()
config.read_file(open(f"{Path(__file__).parents[0]}/config.cfg"))

class GoodreadsTransform:

def __init__(self, spark):
self._spark = spark
self._load_path = 's3://' + config.get('BUCKET', 'WORKING_ZONE')
self._save_path = 's3://' + config.get('BUCKET', 'PROCESSED_ZONE')


def transform_author_dataset(self):
logging.debug("Inside transform author dataset module")
author_df = \
self._spark.read.csv('s3://goodreads-working-zone/author.csv', header=True, mode='PERMISSIVE',inferSchema=True)
self._spark.read.csv( self._load_path + '/author.csv', header=True, mode='PERMISSIVE',inferSchema=True)

author_lookup_df = author_df.groupBy('author_id')\
.agg(fn.max('record_create_timestamp').alias('record_create_timestamp'))
Expand All @@ -23,14 +33,18 @@ def transform_author_dataset(self):
.select(author_df.columns) \
.withColumn('name', goodreads_udf.remove_extra_spaces('name'))

deduped_author_df.coalesce(3).write.csv(path='s3://goodreads-processed-zone/author/', mode='overwrite', \
compression='gzip', header=True)
logging.debug(f"Attempting to write data to {self._save_path + '/authors/'}")
deduped_author_df\
.coalesce(3)\
.write\
.csv(path = self._save_path + '/authors/', sep = '|', mode='overwrite', compression='gzip', header=True, timestampFormat = 'yyyy-MM-dd HH:mm:ss.SSS', quote = '"', escape = '"')



def transform_reviews_dataset(self):
logging.debug("Inside transform reviews dataset module")
reviews_df = self._spark.read \
.csv('s3://goodreads-working-zone/reviews.csv', header=True, \
.csv(self._load_path + '/reviews.csv', header=True, \
mode = 'PERMISSIVE', inferSchema=True, quote = "\"", escape = "\"")

reviews_lookup_df = reviews_df\
Expand All @@ -48,12 +62,17 @@ def transform_reviews_dataset(self):
.withColumn('review_added_date', goodreads_udf.stringtodatetime('review_added_date')) \
.withColumn('review_updated_date', goodreads_udf.stringtodatetime('review_updated_date'))

deduped_reviews_df.coalesce(10).write.csv(path='s3://goodreads-processed-zone/reviews/', mode='overwrite',
compression='gzip', header=True)

logging.debug(f"Attempting to write data to {self._save_path + '/reviews/'}")
deduped_reviews_df\
.coalesce(3)\
.write\
.csv(path = self._save_path + '/reviews/', sep = '|', mode='overwrite', compression='gzip', header=True, timestampFormat = 'yyyy-MM-dd HH:mm:ss.SSS', quote = '"', escape = '"')


def transform_books_dataset(self):
books_df = self._spark.read.csv('s3://goodreads-working-zone/book.csv', header=True, mode='PERMISSIVE',
logging.debug("Inside transform books dataset module")
books_df = self._spark.read.csv(self._load_path + '/book.csv', header=True, mode='PERMISSIVE',
inferSchema=True, quote="\"", escape="\"")

books_lookup_df = books_df\
Expand All @@ -65,11 +84,16 @@ def transform_books_dataset(self):
deduped_books_df = books_df\
.join(books_lookup_df, ['book_id', 'record_create_timestamp'], how='inner')\
.select(books_df.columns)
deduped_books_df.coalesce(10).write.csv(path='s3://goodreads-processed-zone/books/', mode='overwrite', \
compression='gzip', header=True)

logging.debug(f"Attempting to write data to {self._save_path + '/books/'}")
deduped_books_df\
.coalesce(3)\
.write\
.csv(path = self._save_path + '/books/', sep = '|', mode='overwrite', compression='gzip', header=True, timestampFormat = 'yyyy-MM-dd HH:mm:ss.SSS', quote = '"', escape = '"')


def tranform_users_dataset(self):
logging.debug("Inside transform users dataset module")
users_df = self._spark.read.csv('s3://goodreads-working-zone/user.csv', header=True, mode='PERMISSIVE',
inferSchema=True, quote="\"", escape="\"")

Expand All @@ -84,5 +108,8 @@ def tranform_users_dataset(self):
.join(users_lookup_df, ['user_id', 'record_create_timestamp'], how='inner')\
.select(users_df.columns)

deduped_users_df.coalesce(3).write.csv(path='s3://goodreads-processed-zone/users/', mode='overwrite',
compression='gzip', header=True)
logging.debug(f"Attempting to write data to {self._save_path + '/users/'}")
deduped_users_df\
.coalesce(3)\
.write\
.csv(path = self._save_path + '/users/', sep = '|', mode='overwrite', compression='gzip', header=True, timestampFormat = 'yyyy-MM-dd HH:mm:ss.SSS', quote = '"', escape = '"')
10 changes: 8 additions & 2 deletions src/logging.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@
keys=root

[handlers]
keys=stream_handler
keys=stream_handler, file_handler

[formatters]
keys=formatter

[logger_root]
level=DEBUG
handlers=stream_handler
handlers=stream_handler, file_handler

[handler_stream_handler]
class=StreamHandler
level=DEBUG
formatter=formatter
args=(sys.stderr,)

[handler_file_handler]
class=FileHandler
level=DEBUG
formatter=formatter
args=('goodreads.log','w')

[formatter_formatter]
format=%(asctime)s %(name)-12s %(levelname)-8s %(message)s
15 changes: 10 additions & 5 deletions src/s3_module.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import boto3
import configparser
import logging
from pathlib import Path

logger = logging.getLogger(__name__)

config = configparser.ConfigParser()
config.read_file(open('config.cfg'))
config.read_file(open(f"{Path(__file__).parents[0]}/config.cfg"))

class GoodReadsS3Module:

Expand All @@ -18,25 +19,29 @@ def __init__(self):
self._processed_zone = config.get('BUCKET','PROCESSED_ZONE')

def s3_move_data(self, source_bucket = None, target_bucket= None):

if source_bucket is None:
source_bucket = self._landing_zone
if target_bucket is None:
target_bucket = self._working_zone

logging.debug(f"Inside s3_move_data : Source bucket set is : {source_bucket}\n Target bucket set is : {target_bucket}")

# cleanup target bucket
self.clean_bucket(target_bucket)

# Move files to working zone
for key in self.check_files_exists(source_bucket):
for key in self.get_files(source_bucket):
if key in config.get('FILES','NAME').split(","):
logging.debug(f"Copying file {key} from {source_bucket} to {target_bucket}")
self._s3.meta.client.copy({'Bucket': source_bucket, 'Key': key}, target_bucket, key)

# cleanup source bucket
self.clean_bucket(source_bucket)

def check_files_exists(self, bucket_name):
def get_files(self, bucket_name):
logging.debug(f"Inspecting bucket : {bucket_name} for files present")
return [bucket_object.key for bucket_object in self._s3.Bucket(bucket_name).objects.all()]

def clean_bucket(self, bucket_name):
self._s3.Bucket(bucket_name).objects.all().delete()
logging.debug(f"Cleaning bucket : {bucket_name}")
self._s3.Bucket(bucket_name).objects.all().delete()
Loading

0 comments on commit cd3fdb5

Please sign in to comment.