diff --git a/Utility/bootstrap_script.txt b/Utility/bootstrap_script.txt index b17f525..6c77b0a 100644 --- a/Utility/bootstrap_script.txt +++ b/Utility/bootstrap_script.txt @@ -46,6 +46,8 @@ sudo pip install psycopg2 or try sudo pip-3.6 install psycopg2 +ssh hadoop@ec2-3-235-6-13.compute-1.amazonaws.com -i EMR_KEY_PAIR.pem "cd /home/hadoop/goodreads_etl_pipeline/src;–export PYSPARK_DRIVER_PYTHON=python3;export PYSPARK_PYTHON=python3;spark-submit --master yarn goodreads_driver.py;" + diff --git a/docs/images/goodreads_dag.PNG b/docs/images/goodreads_dag.PNG new file mode 100644 index 0000000..50353e4 Binary files /dev/null and b/docs/images/goodreads_dag.PNG differ diff --git a/goodreadsfaker/generate_fake_data.py b/goodreadsfaker/generate_fake_data.py index c4101cb..d2fcc8a 100644 --- a/goodreadsfaker/generate_fake_data.py +++ b/goodreadsfaker/generate_fake_data.py @@ -24,17 +24,18 @@ def __init__(self): self._book_data_list = list() self._base_directory = "D:\GoodReadsData\\fake" - def generate(self): - review_obj = self._generate_fake_review_obj() - self._review_data_list.append(self._parse_review_data(review_obj)) - self._user_data_list.append(self._parse_user_data(review_obj)) - self._author_data_list.append(self._parse_author_data(review_obj)) - self._book_data_list.append(self._parse_book_data(review_obj)) - + def generate(self, num_records): + for i in range(num_records): + review_obj = self._generate_fake_review_obj() + self._review_data_list.append(self._parse_review_data(review_obj)) + self._user_data_list.append(self._parse_user_data(review_obj)) + self._author_data_list.append(self._parse_author_data(review_obj)) + self._book_data_list.append(self._parse_book_data(review_obj)) for module_name, module_data in zip(["reviews", "user", "author", "book"], [self._review_data_list, self._user_data_list, self._author_data_list, self._book_data_list]): - self._write_to_disk(module_name, module_data) + self._write_to_disk(module_name, module_data) + self._clear_modules() def _write_to_disk(self, module_name, module_data): file = os.path.join(self._base_directory, f"{module_name}.csv") @@ -44,10 +45,13 @@ def _write_to_disk(self, module_name, module_data): pd \ .DataFrame(module_data) \ .to_csv(path_or_buf=file, sep=',',index=False, mode=write_mode, header=header, quoting=csv.QUOTE_MINIMAL, encoding='utf-8') - self._user_data_list = list() - self._review_data_list = list() - self._author_data_list = list() - self._book_data_list = list() + + + def _clear_modules(self): + self._user_data_list = list() + self._review_data_list = list() + self._author_data_list = list() + self._book_data_list = list() def _clean_text(cls, text): return ' '.join((text.replace('\n','')).split()) @@ -56,7 +60,7 @@ def _generate_fake_review_obj(self): return { #Fake review - "review_id" : self._faker.random_int(0, 100000), + "review_id" : self._faker.random_int(0, 10000000), "user_id" : self._faker.random_int(0, 100000), "book_id" : self._faker.random_int(0, 100000), "author_id" : self._faker.random_int(0, 100000), @@ -199,5 +203,6 @@ def _parse_author_data(self, review_obj): required.add_argument("-n", "--num_records", type=int, metavar='', required=True, help="Number of records to genertae.") args = parser.parse_args() fk = GoodreadsFake() - for i in range(args.num_records): - fk.generate() \ No newline at end of file + for i in range(100): + print(f"Running iteration : {i}") + fk.generate(args.num_records) \ No newline at end of file diff --git a/src/goodreads_transform.py b/src/goodreads_transform.py index d3c13d7..6d6a946 100644 --- a/src/goodreads_transform.py +++ b/src/goodreads_transform.py @@ -14,8 +14,8 @@ class GoodreadsTransform: def __init__(self, spark): self._spark = spark - self._load_path = 's3://' + config.get('BUCKET', 'WORKING_ZONE') - self._save_path = 's3://' + config.get('BUCKET', 'PROCESSED_ZONE') + self._load_path = 's3a://' + config.get('BUCKET', 'WORKING_ZONE') + self._save_path = 's3a://' + config.get('BUCKET', 'PROCESSED_ZONE') def transform_author_dataset(self):