-
Notifications
You must be signed in to change notification settings - Fork 186
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Read configuration file granularly (#160)
- Loading branch information
Showing
7 changed files
with
287 additions
and
256 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
import datetime as dt | ||
from itertools import chain | ||
from math import ceil | ||
|
||
from api import ( | ||
DEFAULT_PAGINATION_SIZE, | ||
PACKT_API_FREE_LEARNING_CLAIM_URL, | ||
PACKT_API_FREE_LEARNING_OFFERS_URL, | ||
PACKT_API_PRODUCTS_URL, | ||
PACKT_API_USER_URL, | ||
PACKT_PRODUCT_SUMMARY_URL | ||
) | ||
from utils.anticaptcha import solve_recaptcha | ||
from utils.logger import get_logger | ||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
PACKT_FREE_LEARNING_URL = 'https://www.packtpub.com/packt/offers/free-learning/' | ||
PACKT_RECAPTCHA_SITE_KEY = '6LeAHSgUAAAAAKsn5jo6RUSTLVxGNYyuvUcLMe0_' | ||
|
||
|
||
def get_all_books_data(api_client): | ||
"""Fetch all user's ebooks data.""" | ||
logger.info("Getting your books data...") | ||
try: | ||
response = api_client.get(PACKT_API_PRODUCTS_URL) | ||
pages_total = int(ceil(response.json().get('count') / DEFAULT_PAGINATION_SIZE)) | ||
my_books_data = list(chain(*map( | ||
lambda page: get_single_page_books_data(api_client, page), | ||
range(pages_total) | ||
))) | ||
logger.info('Books data has been successfully fetched.') | ||
return my_books_data | ||
except (AttributeError, TypeError): | ||
logger.error('Couldn\'t fetch user\'s books data.') | ||
|
||
|
||
def get_single_page_books_data(api_client, page): | ||
"""Fetch ebooks data from single products API pagination page.""" | ||
try: | ||
response = api_client.get( | ||
PACKT_API_PRODUCTS_URL, | ||
params={ | ||
'sort': 'createdAt:DESC', | ||
'offset': DEFAULT_PAGINATION_SIZE * page, | ||
'limit': DEFAULT_PAGINATION_SIZE | ||
} | ||
) | ||
return [{'id': t['productId'], 'title': t['productName']} for t in response.json().get('data')] | ||
except Exception: | ||
logger.error('Couldn\'t fetch page {} of user\'s books data.'.format(page)) | ||
|
||
|
||
def claim_product(api_client, anticaptcha_key): | ||
"""Grab Packt Free Learning ebook.""" | ||
logger.info("Start grabbing ebook...") | ||
|
||
utc_today = dt.datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) | ||
offer_response = api_client.get( | ||
PACKT_API_FREE_LEARNING_OFFERS_URL, | ||
params={ | ||
'dateFrom': utc_today.isoformat(), | ||
'dateTo': (utc_today + dt.timedelta(days=1)).isoformat() | ||
} | ||
) | ||
[offer_data] = offer_response.json().get('data') | ||
offer_id = offer_data.get('id') | ||
product_id = offer_data.get('productId') | ||
|
||
user_response = api_client.get(PACKT_API_USER_URL) | ||
[user_data] = user_response.json().get('data') | ||
user_id = user_data.get('id') | ||
|
||
product_response = api_client.get(PACKT_PRODUCT_SUMMARY_URL.format(product_id=product_id)) | ||
product_data = {'id': product_id, 'title': product_response.json()['title']}\ | ||
if product_response.status_code == 200 else None | ||
|
||
if any(product_id == book['id'] for book in get_all_books_data(api_client)): | ||
logger.info('You have already claimed Packt Free Learning "{}" offer.'.format(product_data['title'])) | ||
return product_data | ||
|
||
logger.info('Started solving ReCAPTCHA on Packt Free Learning website...') | ||
recaptcha_solution = solve_recaptcha(anticaptcha_key, PACKT_FREE_LEARNING_URL, PACKT_RECAPTCHA_SITE_KEY) | ||
|
||
claim_response = api_client.put( | ||
PACKT_API_FREE_LEARNING_CLAIM_URL.format(user_id=user_id, offer_id=offer_id), | ||
json={'recaptcha': recaptcha_solution} | ||
) | ||
|
||
if claim_response.status_code == 200: | ||
logger.info('A new Packt Free Learning ebook "{}" has been grabbed!'.format(product_data['title'])) | ||
elif claim_response.status_code == 409: | ||
logger.info('You have already claimed Packt Free Learning "{}" offer.'.format(product_data['title'])) | ||
else: | ||
logger.error('Claiming Packt Free Learning book has failed.') | ||
|
||
return product_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import configparser | ||
import os | ||
|
||
from utils.logger import get_logger | ||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
class ConfigurationModel(object): | ||
"""Contains all needed data stored in configuration file.""" | ||
|
||
def __init__(self, cfg_file_path): | ||
self.configuration = configparser.ConfigParser() | ||
self.configuration.read(cfg_file_path) | ||
|
||
@property | ||
def packt_login_credentials(self): | ||
"""Return Packt user login credentials.""" | ||
return self.configuration.get('LOGIN_DATA', 'email'), self.configuration.get('LOGIN_DATA', 'password') | ||
|
||
@property | ||
def anticaptcha_api_key(self): | ||
"""Return AntiCaptcha API key.""" | ||
return self.configuration.get("ANTICAPTCHA_DATA", 'key') | ||
|
||
@property | ||
def config_download_data(self): | ||
"""Return download configuration data.""" | ||
download_path = self.configuration.get("DOWNLOAD_DATA", 'download_folder_path') | ||
if not os.path.exists(download_path): | ||
message = "Download folder path: '{}' doesn't exist".format(download_path) | ||
logger.error(message) | ||
raise ValueError(message) | ||
download_formats = tuple(form.replace(' ', '') for form in | ||
self.configuration.get("DOWNLOAD_DATA", 'download_formats').split(',')) | ||
return download_path, download_formats |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
import os | ||
import sys | ||
|
||
import requests | ||
from requests.exceptions import ConnectionError | ||
from slugify import slugify | ||
|
||
from api import ( | ||
PACKT_API_PRODUCT_FILE_DOWNLOAD_URL, | ||
PACKT_API_PRODUCT_FILE_TYPES_URL | ||
) | ||
from utils.logger import get_logger | ||
|
||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
class PacktConnectionError(ConnectionError): | ||
"""Error raised whenever fetching data from Packt API fails.""" | ||
pass | ||
|
||
|
||
def slugify_product_name(title): | ||
"""Return book title with spaces replaced by underscore and unicodes replaced by characters valid in filenames.""" | ||
return slugify(title, separator='_', lowercase=False) | ||
|
||
|
||
def get_product_download_urls(api_client, product_id): | ||
error_message = 'Couldn\'t fetch download URLs for product {}.'.format(product_id) | ||
try: | ||
response = api_client.get(PACKT_API_PRODUCT_FILE_TYPES_URL.format(product_id=product_id)) | ||
if response.status_code == 200: | ||
return { | ||
format: PACKT_API_PRODUCT_FILE_DOWNLOAD_URL.format(product_id=product_id, file_type=format) | ||
for format in response.json().get('data')[0].get('fileTypes') | ||
} | ||
else: | ||
logger.info(error_message) | ||
return {} | ||
except Exception: | ||
raise PacktConnectionError(error_message) | ||
|
||
|
||
def download_products(api_client, download_directory, formats, product_list, into_folder=False): | ||
"""Download selected products.""" | ||
nr_of_books_downloaded = 0 | ||
is_interactive = sys.stdout.isatty() | ||
for book in product_list: | ||
download_urls = get_product_download_urls(api_client, book['id']) | ||
for format, download_url in download_urls.items(): | ||
if format in formats and not (format == 'code' and 'video' in download_urls and 'video' in formats): | ||
file_extention = 'zip' if format in ('video', 'code') else format | ||
file_name = slugify_product_name(book['title']) | ||
logger.info('Title: "{}"'.format(book['title'])) | ||
if into_folder: | ||
target_download_path = os.path.join(download_directory, file_name) | ||
if not os.path.isdir(target_download_path): | ||
os.mkdir(target_download_path) | ||
else: | ||
target_download_path = os.path.join(download_directory) | ||
full_file_path = os.path.join(target_download_path, '{}.{}'.format(file_name, file_extention)) | ||
temp_file_path = os.path.join(target_download_path, 'download.tmp') | ||
if os.path.isfile(full_file_path): | ||
logger.info('"{}.{}" already exists under the given path.'.format(file_name, file_extention)) | ||
else: | ||
if format == 'code': | ||
logger.info('Downloading code for ebook: "{}"...'.format(book['title'])) | ||
elif format == 'video': | ||
logger.info('Downloading "{}" video...'.format(book['title'])) | ||
else: | ||
logger.info('Downloading ebook: "{}" in {} format...'.format(book['title'], format)) | ||
try: | ||
file_url = api_client.get(download_url).json().get('data') | ||
r = api_client.get(file_url, timeout=100, stream=True) | ||
if r.status_code is 200: | ||
try: | ||
with open(temp_file_path, 'wb') as f: | ||
total_length = int(r.headers.get('content-length')) | ||
num_of_chunks = (total_length / 1024) + 1 | ||
for num, chunk in enumerate(r.iter_content(chunk_size=1024)): | ||
if chunk: | ||
if is_interactive: | ||
update_download_progress_bar(num / num_of_chunks) | ||
f.write(chunk) | ||
f.flush() | ||
if is_interactive: | ||
update_download_progress_bar(-1) # add end of line | ||
os.rename(temp_file_path, full_file_path) | ||
finally: | ||
if os.path.isfile(temp_file_path): | ||
os.remove(temp_file_path) | ||
|
||
if format == 'code': | ||
logger.success('Code for ebook "{}" downloaded successfully!'.format(book['title'])) | ||
else: | ||
logger.success('Ebook "{}" in {} format downloaded successfully!'.format( | ||
book['title'], | ||
format | ||
)) | ||
nr_of_books_downloaded += 1 | ||
else: | ||
message = 'Couldn\'t download "{}" ebook in {} format.'.format(book['title'], format) | ||
logger.error(message) | ||
raise requests.exceptions.RequestException(message) | ||
except Exception as e: | ||
logger.error(e) | ||
logger.info("{} ebooks have been downloaded!".format(str(nr_of_books_downloaded))) | ||
|
||
|
||
def update_download_progress_bar(current_work_done): | ||
"""Prints progress bar, current_work_done should be float value in range {0.0 - 1.0}, else prints '\n'""" | ||
if 0.0 <= current_work_done <= 1.0: | ||
print( | ||
"\r[PROGRESS] - [{0:50s}] {1:.1f}% ".format('#' * int(current_work_done * 50), current_work_done * 100), | ||
end="", ) | ||
else: | ||
print("") |
Oops, something went wrong.