From 5341c06a829a1e35f14ddb8a5f328860449c8171 Mon Sep 17 00:00:00 2001 From: mjenczmyk Date: Fri, 5 Jul 2019 16:24:54 +0200 Subject: [PATCH] Read configuration file granularly (#160) --- pylama.ini | 3 + setup.py | 2 +- src/claimer.py | 98 +++++++++++ src/configuration.py | 36 ++++ src/downloader.py | 117 +++++++++++++ src/packtPublishingFreeEbook.py | 281 +++----------------------------- src/utils/anticaptcha.py | 6 + 7 files changed, 287 insertions(+), 256 deletions(-) create mode 100644 src/claimer.py create mode 100644 src/configuration.py create mode 100644 src/downloader.py diff --git a/pylama.ini b/pylama.ini index e12b615..dcc9c6c 100644 --- a/pylama.ini +++ b/pylama.ini @@ -13,3 +13,6 @@ disable = R [pylama:*/packtPublishingFreeEbook.py] linters = pycodestyle,pyflakes + +[pylama:*/downloader.py] +linters=pycodestyle,pyflakes diff --git a/setup.py b/setup.py index a6ab79f..c80995d 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ long_description_content_type='text/markdown', package_dir={'': 'src'}, packages=find_packages('src'), - py_modules=['packtPublishingFreeEbook', 'api'], + py_modules=['packtPublishingFreeEbook', 'api', 'claimer', 'configuration', 'downloader'], install_requires=requirements, extras_require={'dev': dev_requirements}, entry_points={ diff --git a/src/claimer.py b/src/claimer.py new file mode 100644 index 0000000..efec5b2 --- /dev/null +++ b/src/claimer.py @@ -0,0 +1,98 @@ +import datetime as dt +from itertools import chain +from math import ceil + +from api import ( + DEFAULT_PAGINATION_SIZE, + PACKT_API_FREE_LEARNING_CLAIM_URL, + PACKT_API_FREE_LEARNING_OFFERS_URL, + PACKT_API_PRODUCTS_URL, + PACKT_API_USER_URL, + PACKT_PRODUCT_SUMMARY_URL +) +from utils.anticaptcha import solve_recaptcha +from utils.logger import get_logger + +logger = get_logger(__name__) + + +PACKT_FREE_LEARNING_URL = 'https://www.packtpub.com/packt/offers/free-learning/' +PACKT_RECAPTCHA_SITE_KEY = '6LeAHSgUAAAAAKsn5jo6RUSTLVxGNYyuvUcLMe0_' + + +def get_all_books_data(api_client): + """Fetch all user's ebooks data.""" + logger.info("Getting your books data...") + try: + response = api_client.get(PACKT_API_PRODUCTS_URL) + pages_total = int(ceil(response.json().get('count') / DEFAULT_PAGINATION_SIZE)) + my_books_data = list(chain(*map( + lambda page: get_single_page_books_data(api_client, page), + range(pages_total) + ))) + logger.info('Books data has been successfully fetched.') + return my_books_data + except (AttributeError, TypeError): + logger.error('Couldn\'t fetch user\'s books data.') + + +def get_single_page_books_data(api_client, page): + """Fetch ebooks data from single products API pagination page.""" + try: + response = api_client.get( + PACKT_API_PRODUCTS_URL, + params={ + 'sort': 'createdAt:DESC', + 'offset': DEFAULT_PAGINATION_SIZE * page, + 'limit': DEFAULT_PAGINATION_SIZE + } + ) + return [{'id': t['productId'], 'title': t['productName']} for t in response.json().get('data')] + except Exception: + logger.error('Couldn\'t fetch page {} of user\'s books data.'.format(page)) + + +def claim_product(api_client, anticaptcha_key): + """Grab Packt Free Learning ebook.""" + logger.info("Start grabbing ebook...") + + utc_today = dt.datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) + offer_response = api_client.get( + PACKT_API_FREE_LEARNING_OFFERS_URL, + params={ + 'dateFrom': utc_today.isoformat(), + 'dateTo': (utc_today + dt.timedelta(days=1)).isoformat() + } + ) + [offer_data] = offer_response.json().get('data') + offer_id = offer_data.get('id') + product_id = offer_data.get('productId') + + user_response = api_client.get(PACKT_API_USER_URL) + [user_data] = user_response.json().get('data') + user_id = user_data.get('id') + + product_response = api_client.get(PACKT_PRODUCT_SUMMARY_URL.format(product_id=product_id)) + product_data = {'id': product_id, 'title': product_response.json()['title']}\ + if product_response.status_code == 200 else None + + if any(product_id == book['id'] for book in get_all_books_data(api_client)): + logger.info('You have already claimed Packt Free Learning "{}" offer.'.format(product_data['title'])) + return product_data + + logger.info('Started solving ReCAPTCHA on Packt Free Learning website...') + recaptcha_solution = solve_recaptcha(anticaptcha_key, PACKT_FREE_LEARNING_URL, PACKT_RECAPTCHA_SITE_KEY) + + claim_response = api_client.put( + PACKT_API_FREE_LEARNING_CLAIM_URL.format(user_id=user_id, offer_id=offer_id), + json={'recaptcha': recaptcha_solution} + ) + + if claim_response.status_code == 200: + logger.info('A new Packt Free Learning ebook "{}" has been grabbed!'.format(product_data['title'])) + elif claim_response.status_code == 409: + logger.info('You have already claimed Packt Free Learning "{}" offer.'.format(product_data['title'])) + else: + logger.error('Claiming Packt Free Learning book has failed.') + + return product_data diff --git a/src/configuration.py b/src/configuration.py new file mode 100644 index 0000000..d02a3a8 --- /dev/null +++ b/src/configuration.py @@ -0,0 +1,36 @@ +import configparser +import os + +from utils.logger import get_logger + +logger = get_logger(__name__) + + +class ConfigurationModel(object): + """Contains all needed data stored in configuration file.""" + + def __init__(self, cfg_file_path): + self.configuration = configparser.ConfigParser() + self.configuration.read(cfg_file_path) + + @property + def packt_login_credentials(self): + """Return Packt user login credentials.""" + return self.configuration.get('LOGIN_DATA', 'email'), self.configuration.get('LOGIN_DATA', 'password') + + @property + def anticaptcha_api_key(self): + """Return AntiCaptcha API key.""" + return self.configuration.get("ANTICAPTCHA_DATA", 'key') + + @property + def config_download_data(self): + """Return download configuration data.""" + download_path = self.configuration.get("DOWNLOAD_DATA", 'download_folder_path') + if not os.path.exists(download_path): + message = "Download folder path: '{}' doesn't exist".format(download_path) + logger.error(message) + raise ValueError(message) + download_formats = tuple(form.replace(' ', '') for form in + self.configuration.get("DOWNLOAD_DATA", 'download_formats').split(',')) + return download_path, download_formats diff --git a/src/downloader.py b/src/downloader.py new file mode 100644 index 0000000..aa11310 --- /dev/null +++ b/src/downloader.py @@ -0,0 +1,117 @@ +import os +import sys + +import requests +from requests.exceptions import ConnectionError +from slugify import slugify + +from api import ( + PACKT_API_PRODUCT_FILE_DOWNLOAD_URL, + PACKT_API_PRODUCT_FILE_TYPES_URL +) +from utils.logger import get_logger + + +logger = get_logger(__name__) + + +class PacktConnectionError(ConnectionError): + """Error raised whenever fetching data from Packt API fails.""" + pass + + +def slugify_product_name(title): + """Return book title with spaces replaced by underscore and unicodes replaced by characters valid in filenames.""" + return slugify(title, separator='_', lowercase=False) + + +def get_product_download_urls(api_client, product_id): + error_message = 'Couldn\'t fetch download URLs for product {}.'.format(product_id) + try: + response = api_client.get(PACKT_API_PRODUCT_FILE_TYPES_URL.format(product_id=product_id)) + if response.status_code == 200: + return { + format: PACKT_API_PRODUCT_FILE_DOWNLOAD_URL.format(product_id=product_id, file_type=format) + for format in response.json().get('data')[0].get('fileTypes') + } + else: + logger.info(error_message) + return {} + except Exception: + raise PacktConnectionError(error_message) + + +def download_products(api_client, download_directory, formats, product_list, into_folder=False): + """Download selected products.""" + nr_of_books_downloaded = 0 + is_interactive = sys.stdout.isatty() + for book in product_list: + download_urls = get_product_download_urls(api_client, book['id']) + for format, download_url in download_urls.items(): + if format in formats and not (format == 'code' and 'video' in download_urls and 'video' in formats): + file_extention = 'zip' if format in ('video', 'code') else format + file_name = slugify_product_name(book['title']) + logger.info('Title: "{}"'.format(book['title'])) + if into_folder: + target_download_path = os.path.join(download_directory, file_name) + if not os.path.isdir(target_download_path): + os.mkdir(target_download_path) + else: + target_download_path = os.path.join(download_directory) + full_file_path = os.path.join(target_download_path, '{}.{}'.format(file_name, file_extention)) + temp_file_path = os.path.join(target_download_path, 'download.tmp') + if os.path.isfile(full_file_path): + logger.info('"{}.{}" already exists under the given path.'.format(file_name, file_extention)) + else: + if format == 'code': + logger.info('Downloading code for ebook: "{}"...'.format(book['title'])) + elif format == 'video': + logger.info('Downloading "{}" video...'.format(book['title'])) + else: + logger.info('Downloading ebook: "{}" in {} format...'.format(book['title'], format)) + try: + file_url = api_client.get(download_url).json().get('data') + r = api_client.get(file_url, timeout=100, stream=True) + if r.status_code is 200: + try: + with open(temp_file_path, 'wb') as f: + total_length = int(r.headers.get('content-length')) + num_of_chunks = (total_length / 1024) + 1 + for num, chunk in enumerate(r.iter_content(chunk_size=1024)): + if chunk: + if is_interactive: + update_download_progress_bar(num / num_of_chunks) + f.write(chunk) + f.flush() + if is_interactive: + update_download_progress_bar(-1) # add end of line + os.rename(temp_file_path, full_file_path) + finally: + if os.path.isfile(temp_file_path): + os.remove(temp_file_path) + + if format == 'code': + logger.success('Code for ebook "{}" downloaded successfully!'.format(book['title'])) + else: + logger.success('Ebook "{}" in {} format downloaded successfully!'.format( + book['title'], + format + )) + nr_of_books_downloaded += 1 + else: + message = 'Couldn\'t download "{}" ebook in {} format.'.format(book['title'], format) + logger.error(message) + raise requests.exceptions.RequestException(message) + except Exception as e: + logger.error(e) + logger.info("{} ebooks have been downloaded!".format(str(nr_of_books_downloaded))) + + +def update_download_progress_bar(current_work_done): + """Prints progress bar, current_work_done should be float value in range {0.0 - 1.0}, else prints '\n'""" + if 0.0 <= current_work_done <= 1.0: + print( + "\r[PROGRESS] - [{0:50s}] {1:.1f}% ".format('#' * int(current_work_done * 50), current_work_done * 100), + end="", ) + else: + print("") diff --git a/src/packtPublishingFreeEbook.py b/src/packtPublishingFreeEbook.py index d9b7eae..d23a308 100644 --- a/src/packtPublishingFreeEbook.py +++ b/src/packtPublishingFreeEbook.py @@ -1,32 +1,15 @@ import click import datetime as dt -from itertools import chain -import logging -from math import ceil import os import sys -import configparser -import requests -from requests.exceptions import ConnectionError -from slugify import slugify - -from api import ( - PacktAPIClient, - PACKT_API_PRODUCTS_URL, - PACKT_PRODUCT_SUMMARY_URL, - PACKT_API_PRODUCT_FILE_TYPES_URL, - PACKT_API_PRODUCT_FILE_DOWNLOAD_URL, - PACKT_API_FREE_LEARNING_OFFERS_URL, - PACKT_API_USER_URL, - PACKT_API_FREE_LEARNING_CLAIM_URL, - DEFAULT_PAGINATION_SIZE -) -from utils.anticaptcha import Anticaptcha +from api import PacktAPIClient +from claimer import claim_product, get_all_books_data +from configuration import ConfigurationModel +from downloader import download_products, slugify_product_name from utils.logger import get_logger logger = get_logger(__name__) -logging.getLogger("requests").setLevel(logging.WARNING) # downgrading logging level for requests DATE_FORMAT = "%Y/%m/%d" @@ -35,227 +18,7 @@ FAILURE_EMAIL_SUBJECT = "{} Grabbing a new free Packt ebook failed" FAILURE_EMAIL_BODY = "Today's free Packt ebook grabbing has failed with exception: {}!\n\nCheck this out!" -PACKT_FREE_LEARNING_URL = 'https://www.packtpub.com/packt/offers/free-learning/' -PACKT_RECAPTCHA_SITE_KEY = '6LeAHSgUAAAAAKsn5jo6RUSTLVxGNYyuvUcLMe0_' - - -def slugify_book_title(title): - """Return book title with spaces replaced by underscore and unicodes replaced by characters valid in filenames.""" - return slugify(title, separator='_', lowercase=False) - - -class PacktConnectionError(ConnectionError): - """Error raised whenever fetching data from Packt page fails.""" - pass - - -class ConfigurationModel(object): - """Contains all needed urls, passwords and packtpub account data stored in .cfg file""" - - def __init__(self, cfg_file_path): - self.configuration = configparser.ConfigParser() - self.configuration.read(cfg_file_path) - self.anticaptcha_clientkey = self.configuration.get("ANTICAPTCHA_DATA", 'key') - self.my_packt_email, self.my_packt_password = self._get_config_login_data() - self.download_folder_path, self.download_formats = self._get_config_download_data() - - def _get_config_login_data(self): - """Gets user login credentials.""" - email = self.configuration.get("LOGIN_DATA", 'email') - password = self.configuration.get("LOGIN_DATA", 'password') - return email, password - - def _get_config_download_data(self): - """Downloads ebook data from the user account.""" - download_path = self.configuration.get("DOWNLOAD_DATA", 'download_folder_path') - if not os.path.exists(download_path): - message = "Download folder path: '{}' doesn't exist".format(download_path) - logger.error(message) - raise ValueError(message) - download_formats = tuple(form.replace(' ', '') for form in - self.configuration.get("DOWNLOAD_DATA", 'download_formats').split(',')) - return download_path, download_formats - - -class PacktPublishingFreeEbook(object): - """Contains some methods to claim, download or send a free daily ebook""" - - download_formats = ('pdf', 'mobi', 'epub', 'video', 'code') - - def __init__(self, cfg): - self.cfg = cfg - self.book_data = None - - def get_all_books_data(self, api_client): - """Fetch all user's ebooks data.""" - logger.info("Getting your books data...") - try: - response = api_client.get(PACKT_API_PRODUCTS_URL) - pages_total = int(ceil(response.json().get('count') / DEFAULT_PAGINATION_SIZE)) - my_books_data = list(chain(*map( - lambda page: self.get_single_page_books_data(api_client, page), - range(pages_total) - ))) - logger.info('Books data has been successfully fetched.') - return my_books_data - except (AttributeError, TypeError): - logger.error('Couldn\'t fetch user\'s books data.') - - def get_single_page_books_data(self, api_client, page): - """Fetch ebooks data from single products API pagination page.""" - try: - response = api_client.get( - PACKT_API_PRODUCTS_URL, - params={ - 'sort': 'createdAt:DESC', - 'offset': DEFAULT_PAGINATION_SIZE * page, - 'limit': DEFAULT_PAGINATION_SIZE - } - ) - return [{'id': t['productId'], 'title': t['productName']} for t in response.json().get('data')] - except Exception: - logger.error('Couldn\'t fetch page {} of user\'s books data.'.format(page)) - - def solve_packt_recapcha(self): - """Solve Packt Free Learning website site ReCAPTCHA.""" - logger.info('Started solving ReCAPTCHA on Packt Free Learning website...') - anticaptcha = Anticaptcha(self.cfg.anticaptcha_clientkey) - return anticaptcha.solve_recaptcha(PACKT_FREE_LEARNING_URL, PACKT_RECAPTCHA_SITE_KEY) - - def grab_ebook(self, api_client): - """Grab Packt Free Learning ebook.""" - logger.info("Start grabbing ebook...") - - utc_today = dt.datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) - offer_response = api_client.get( - PACKT_API_FREE_LEARNING_OFFERS_URL, - params={ - 'dateFrom': utc_today.isoformat(), - 'dateTo': (utc_today + dt.timedelta(days=1)).isoformat() - } - ) - [offer_data] = offer_response.json().get('data') - offer_id = offer_data.get('id') - product_id = offer_data.get('productId') - - user_response = api_client.get(PACKT_API_USER_URL) - [user_data] = user_response.json().get('data') - user_id = user_data.get('id') - - product_response = api_client.get(PACKT_PRODUCT_SUMMARY_URL.format(product_id=product_id)) - self.book_data = {'id': product_id, 'title': product_response.json()['title']}\ - if product_response.status_code == 200 else None - - if any(product_id == book['id'] for book in self.get_all_books_data(api_client)): - logger.info('You have already claimed Packt Free Learning "{}" offer.'.format(self.book_data['title'])) - return - - claim_response = api_client.put( - PACKT_API_FREE_LEARNING_CLAIM_URL.format(user_id=user_id, offer_id=offer_id), - json={'recaptcha': self.solve_packt_recapcha()} - ) - - if claim_response.status_code == 200: - logger.info('A new Packt Free Learning ebook "{}" has been grabbed!'.format(self.book_data['title'])) - elif claim_response.status_code == 409: - logger.info('You have already claimed Packt Free Learning "{}" offer.'.format(self.book_data['title'])) - else: - logger.error('Claiming Packt Free Learning book has failed.') - - def download_books(self, api_client, product_data=None, formats=None, into_folder=False): - """Download selected products.""" - def get_product_download_urls(product_id): - error_message = 'Couldn\'t fetch download URLs for product {}.'.format(product_id) - try: - response = api_client.get(PACKT_API_PRODUCT_FILE_TYPES_URL.format(product_id=product_id)) - if response.status_code == 200: - return { - format: PACKT_API_PRODUCT_FILE_DOWNLOAD_URL.format(product_id=product_id, file_type=format) - for format in response.json().get('data')[0].get('fileTypes') - } - else: - logger.info(error_message) - return {} - except Exception: - raise PacktConnectionError(error_message) - # download ebook - my_books_data = [product_data] if product_data else self.get_all_books_data(api_client) - formats = formats or self.cfg.download_formats or self.download_formats - - nr_of_books_downloaded = 0 - is_interactive = sys.stdout.isatty() - for book in my_books_data: - download_urls = get_product_download_urls(book['id']) - for format, download_url in download_urls.items(): - if format in formats and not (format == 'code' and 'video' in download_urls and 'video' in formats): - file_extention = 'zip' if format in ('video', 'code') else format - file_name = slugify_book_title(book['title']) - logger.info('Title: "{}"'.format(book['title'])) - if into_folder: - target_download_path = os.path.join(self.cfg.download_folder_path, file_name) - if not os.path.isdir(target_download_path): - os.mkdir(target_download_path) - else: - target_download_path = os.path.join(self.cfg.download_folder_path) - full_file_path = os.path.join(target_download_path, '{}.{}'.format(file_name, file_extention)) - temp_file_path = os.path.join(target_download_path, 'download.tmp') - if os.path.isfile(full_file_path): - logger.info('"{}.{}" already exists under the given path.'.format(file_name, file_extention)) - else: - if format == 'code': - logger.info('Downloading code for ebook: "{}"...'.format(book['title'])) - elif format == 'video': - logger.info('Downloading "{}" video...'.format(book['title'])) - else: - logger.info('Downloading ebook: "{}" in {} format...'.format(book['title'], format)) - try: - file_url = api_client.get(download_url).json().get('data') - r = api_client.get(file_url, timeout=100, stream=True) - if r.status_code is 200: - try: - with open(temp_file_path, 'wb') as f: - total_length = int(r.headers.get('content-length')) - num_of_chunks = (total_length / 1024) + 1 - for num, chunk in enumerate(r.iter_content(chunk_size=1024)): - if chunk: - if is_interactive: - PacktPublishingFreeEbook.update_download_progress_bar( - num / num_of_chunks - ) - f.write(chunk) - f.flush() - if is_interactive: - PacktPublishingFreeEbook.update_download_progress_bar(-1) # add end of line - os.rename(temp_file_path, full_file_path) - finally: - if os.path.isfile(temp_file_path): - os.remove(temp_file_path) - - if format == 'code': - logger.success('Code for ebook "{}" downloaded successfully!'.format(book['title'])) - else: - logger.success('Ebook "{}" in {} format downloaded successfully!'.format( - book['title'], - format - )) - nr_of_books_downloaded += 1 - else: - message = 'Couldn\'t download "{}" ebook in {} format.'.format(book['title'], format) - logger.error(message) - raise requests.exceptions.RequestException(message) - except Exception as e: - logger.error(e) - logger.info("{} ebooks have been downloaded!".format(str(nr_of_books_downloaded))) - - @staticmethod - def update_download_progress_bar(current_work_done): - """Prints progress bar, current_work_done should be float value in range {0.0 - 1.0}, else prints '\n'""" - if 0.0 <= current_work_done <= 1.0: - print( - "\r[PROGRESS] - [{0:50s}] {1:.1f}% ".format('#' * int(current_work_done * 50), current_work_done * 100), - end="", ) - else: - print("") +AVAILABLE_DOWNLOAD_FORMATS = ('pdf', 'mobi', 'epub', 'video', 'code') @click.command() @@ -285,12 +48,12 @@ def packt_cli(cfgpath, grab, grabd, dall, sgd, mail, status_mail, folder, noauth try: cfg = ConfigurationModel(config_file_path) - ebook = PacktPublishingFreeEbook(cfg) - api_client = PacktAPIClient(cfg.my_packt_email, cfg.my_packt_password) + product_data = None + api_client = PacktAPIClient(*cfg.packt_login_credentials) # Grab the newest book if grab or grabd or sgd or mail: - ebook.grab_ebook(api_client) + product_data = claim_product(api_client, cfg.anticaptcha_api_key) # Send email about successful book grab. Do it only when book # isn't going to be emailed as we don't want to send email twice. @@ -300,28 +63,36 @@ def packt_cli(cfgpath, grab, grabd, dall, sgd, mail, status_mail, folder, noauth mb.send_info( subject=SUCCESS_EMAIL_SUBJECT.format( dt.datetime.now().strftime(DATE_FORMAT), - ebook.book_data['title'] + product_data['title'] ), - body=SUCCESS_EMAIL_BODY.format(ebook.book_data['title']) + body=SUCCESS_EMAIL_BODY.format(product_data['title']) ) # Download book(s) into proper location. if grabd or dall or sgd or mail: + download_directory, formats = cfg.config_download_data + download_directory = download_directory if (dall or grabd) else os.getcwd() # cwd for temporary downloads + formats = formats or AVAILABLE_DOWNLOAD_FORMATS + if dall: - ebook.download_books(api_client, into_folder=into_folder) + download_products( + api_client, + download_directory, + formats, + get_all_books_data(api_client), + into_folder=into_folder + ) elif grabd: - ebook.download_books(api_client, ebook.book_data, into_folder=into_folder) + download_products(api_client, download_directory, formats, [product_data], into_folder=into_folder) else: # sgd or mail - # download it temporarily to cwd - cfg.download_folder_path = os.getcwd() - ebook.download_books(api_client, ebook.book_data, into_folder=False) + download_products(api_client, download_directory, formats, [product_data], into_folder=False) # Send downloaded book(s) by mail or to Google Drive. if sgd or mail: paths = [ - os.path.join(cfg.download_folder_path, path) - for path in os.listdir(cfg.download_folder_path) - if os.path.isfile(path) and slugify_book_title(ebook.book_data['title']) in path + os.path.join(download_directory, path) + for path in os.listdir(download_directory) + if os.path.isfile(path) and slugify_product_name(product_data['title']) in path ] if sgd: from utils.google_drive import GoogleDriveManager diff --git a/src/utils/anticaptcha.py b/src/utils/anticaptcha.py index 36ec91b..7a897c0 100644 --- a/src/utils/anticaptcha.py +++ b/src/utils/anticaptcha.py @@ -64,3 +64,9 @@ def solve_recaptcha(self, website_url, website_key): solution = self.__wait_for_task_result(task_id)['solution']['gRecaptchaResponse'] logger.success('Solution found for {} task.'.format(task_id)) return solution + + +def solve_recaptcha(anticaptcha_key, website_url, website_key): + """Solve ReCAPTCHA task for given website.""" + anticaptcha = Anticaptcha(anticaptcha_key) + return anticaptcha.solve_recaptcha(website_url, website_key)