From d05aefb47d1752bc2b2f000a67322dfd20782b71 Mon Sep 17 00:00:00 2001 From: Andrew Anker Date: Sun, 13 Nov 2022 09:33:01 -0800 Subject: [PATCH] Refactor to classes in external module Pulls WordList and WebExtract out of main xwordlist.py and into xwl.py which allows us to use xwl.py functionality in other programs. Also creates XWLException to raise errors back to main program. Also fixes issue where --minimum wasn't checking that it received a proper argument --- __init__.py | 2 + xwl.py | 195 +++++++++++++++++++++++++++++++ xwordlist.py | 318 ++++++++++++++------------------------------------- 3 files changed, 280 insertions(+), 235 deletions(-) create mode 100644 __init__.py create mode 100644 xwl.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..462adce --- /dev/null +++ b/__init__.py @@ -0,0 +1,2 @@ +from xwl import WordList, WebExtract, XWLException +from xwordlist import main diff --git a/xwl.py b/xwl.py new file mode 100644 index 0000000..06e3956 --- /dev/null +++ b/xwl.py @@ -0,0 +1,195 @@ +import re +import requests +import urllib.parse + +from bs4 import BeautifulSoup +from anyascii import anyascii + + +class WordList: + def __init__(self, myList=[], color=''): + self.myList = myList + self.color = color + + # List transformation options + def minimum(self, numChars): + if isinstance(numChars, int) or numChars.isdigit(): + newList = [] + for line in self.myList: + if line and len(line) >= int(numChars): + newList.append(line) + self.myList = newList + else: + error = f'Exiting... argument for minimum must be an integer, not {self.err_text(numChars)}' + raise XWLException(error) + + def strip(self, stripWhat): + newList = [] + if stripWhat in ['keepdiacritic', 'diacritic']: + for word in self.myList: + if stripWhat != 'keepdiacritic': + word = anyascii(word) + newWord = ''.join([i for i in word if i.isalpha()]) + if len(newWord) > 0: + newList.append(newWord) + self.myList = newList + else: + error = f'Exiting... unknown strip option {self.err_text(stripWhat)}' + raise XWLException(error) + + def case(self, newCase): + case_dict = { + 'upper': str.upper, + 'lower': str.lower, + } + if newCase in case_dict: + self.myList = list(case_dict[newCase](line) for line in self.myList) + elif newCase != 'none': + error = f'Exiting... unknown case option {self.err_text(newCase)}' + raise XWLException(error) + + def dedupe(self, dedupeType): + if dedupeType == 'bycase': + self.myList = list(dict.fromkeys(self.myList)) + elif dedupeType == 'nocase': + newList = [] + newSet = set() + for line in self.myList: + newLine = line.casefold() + if newLine not in newSet: + newSet.add(newLine) + newList.append(line) + self.myList = newList + else: + error = f'Exiting... incorrect dedupe option {self.err_text(dedupeType)}' + raise XWLException(error) + + def alphabetize(self, direction): + if direction == 'normal': + self.myList = sorted(self.myList, key=str.casefold) + elif direction == 'reverse': + self.myList = sorted(self.myList, key=str.casefold, reverse=True) + else: + error = f'Exiting... incorrect alphabetize option {self.err_text(direction)}' + raise XWLException(error) + + # Content parsing options + def regex(self, regexInput): + try: + newList = [] + for line in self.myList: + newList.extend(re.findall(regexInput, line)) + self.myList = newList + + except re.error: + error = f'Regex pattern {self.err_text(regexInput)} not valid, please check and try again' + raise XWLException(error) + + except Exception as e: + error = f'Error {self.err_text(e)}' + raise XWLException(error) + + def convert(self, parseChars): + # First trap for problem where defaults are assumed but not specified + for char in parseChars: + newList = [] + for line in self.myList: + if line.find(char) != -1: + newList.extend(line.split(char)) + else: + newList.append(line) + self.myList = newList + + def err_text(self, text): + if self.color: + return f'<{self.color}>{text}' + else: + return text + + +class WebExtract: + def __init__(self, color=''): + self.returnWords = [] + self.scrapeWords = [] + self.color = color + + def get_web_page(self, webURL, parseDict, webExtract): + try: + r = requests.get(webURL) + if r.status_code == 200: + inputSoup = BeautifulSoup(r.text, 'html.parser') + if parseDict: + # See if we have a class, in which case, have to do more screening (1 to N classes) + if 'class' in parseDict: + classDict = {} + classDict['class'], whichNum = parseDict['class'] + fullSoup = inputSoup.find_all(attrs=classDict) + for counter, whichSoup in enumerate(fullSoup, start=1): + if whichNum == counter or whichNum == 0: + self._extract_from_web(webExtract, whichSoup, webURL) + self.returnWords.extend(self.scrapeWords) + else: + self._extract_from_web(webExtract, inputSoup.find(attrs=parseDict), webURL) + self.returnWords.extend(self.scrapeWords) + else: + self._extract_from_web(webExtract, inputSoup, webURL) + self.returnWords.extend(self.scrapeWords) + + elif r.status_code == 403: + error = 'Unable to load webpage due to code 403: this usually means we are being blocked' + raise XWLException(error) + + else: + error = f'Unable to load webpage, status code = {self.err_text(r.status_code)}' + raise XWLException(error) + + except XWLException: + raise + + except (requests.URLRequired, requests.RequestException): + error = f'No content retrieved, error URL: {self.err_text(webURL)}' + raise XWLException(error) + + except AttributeError: + error = f'HTML attribute {self.err_text(parseDict)} not found, check document and try again' + raise XWLException(error) + + except Exception as e: + error = f'Web error {self.err_text(e)}' + raise XWLException(error) + + def _extract_from_web(self, extractWhat, soup, extractURL): + # A few ways the default option can come in, try that first + if extractWhat == 'text' or extractWhat is None: + self.scrapeWords = soup.stripped_strings + elif extractWhat == 'links': + for link in soup.find_all('a'): + getURL = link.get('href') + if getURL: + parsePieces = urllib.parse.urlsplit(getURL) + # Check to see if absolute or relative URL. If relative, make it absolute + if parsePieces.scheme == '' and parsePieces.netloc == '': + parseExtract = urllib.parse.urlsplit(extractURL) + getURL = urllib.parse.urljoin(f'{parseExtract.scheme}://{parseExtract.netloc}', getURL) + self.scrapeWords.append(getURL) + elif extractWhat[:5] == 'html-': + extractTags = extractWhat[5:].split('_') + for tag in extractTags: + for link in soup.find_all(tag): + text = link.get_text() + self.scrapeWords.append(text) + else: + error = f'Exiting... incorrect option for webextract: {self.err_text(extractWhat)}' + raise XWLException(error) + + self.scrapeWords = list(line for line in self.scrapeWords) + + def err_text(self, text): + if self.color: + return f'<{self.color}>{text}' + else: + return text + + +class XWLException(Exception): + pass diff --git a/xwordlist.py b/xwordlist.py index efc26ec..89e89e0 100755 --- a/xwordlist.py +++ b/xwordlist.py @@ -3,15 +3,12 @@ import configargparse import os import sys -import requests import pathlib import urllib.parse import time -import re +import xwl from prompt_toolkit import prompt, print_formatted_text, HTML -from bs4 import BeautifulSoup -from anyascii import anyascii from importlib.metadata import version @@ -55,101 +52,6 @@ 'ansibrightyellow', 'ansibrightblue', 'ansibrightmagenta', 'ansibrightcyan', 'ansiwhite'] -class WordList: - def __init__(self, myList=[]): - self.myList = myList - - # List transformation options - def minimum(self, numChars): - newList = [] - for line in self.myList: - if line and len(line) >= int(numChars): - newList.append(line) - self.myList = newList - - def strip(self, stripWhat): - newList = [] - if stripWhat in ['keepdiacritic', 'diacritic']: - for word in self.myList: - if stripWhat != 'keepdiacritic': - word = anyascii(word) - newWord = ''.join([i for i in word if i.isalpha()]) - if len(newWord) > 0: - newList.append(newWord) - self.myList = newList - else: - print_text = 'Exiting... unknown strip option <{color}>{stripWhat}' - print_line(print_text, {'stripWhat': stripWhat}) - sys.exit() - - def case(self, newCase): - case_dict = { - 'upper': str.upper, - 'lower': str.lower, - } - if newCase in case_dict: - self.myList = list(case_dict[newCase](line) for line in self.myList) - elif newCase != 'none': - print_text = 'Exiting... unknown case option <{color}>{newcase}' - print_line(print_text, {'newcase': newCase}) - sys.exit() - - def dedupe(self, dedupeType): - if dedupeType == 'bycase': - self.myList = list(dict.fromkeys(self.myList)) - elif dedupeType == 'nocase': - newList = [] - newSet = set() - for line in self.myList: - newLine = line.casefold() - if newLine not in newSet: - newSet.add(newLine) - newList.append(line) - self.myList = newList - else: - print_text = 'Exiting... incorrect dedupe option <{color}>{dedupeType}' - print_line(print_text, {'dedupeType': dedupeType}) - sys.exit() - - def alphabetize(self, direction): - if direction == 'normal': - self.myList = sorted(self.myList, key=str.casefold) - elif direction == 'reverse': - self.myList = sorted(self.myList, key=str.casefold, reverse=True) - else: - print_text = 'Exiting... incorrect alphabetize option <{color}>{direction}' - print_line(print_text, {'direction': direction}) - sys.exit() - - # Content parsing options - def regex(self, regexInput): - try: - newList = [] - for line in self.myList: - newList.extend(re.findall(regexInput, line)) - self.myList = newList - - except re.error: - error_text = 'Regex pattern <{color}>{regexInput} not valid, please check and try again' - print_line(error_text, {'regexInput': regexInput}) - sys.exit() - - except Exception as e: - print_line('Error {e}', {'e': e}) - sys.exit() - - def convert(self, parseChars): - # First trap for problem where defaults are assumed but not specified - for char in parseChars: - newList = [] - for line in self.myList: - if line.find(char) != -1: - newList.extend(line.split(char)) - else: - newList.append(line) - self.myList = newList - - def print_line(printText, argument={}, endText='\n'): print_formatted_text(HTML(printText.format(**argument, color=IMPACT_COLOR)), end=endText) return @@ -182,77 +84,6 @@ def create_dict(localAttrs): sys.exit() -def extract_from_web(extractWhat, soup, extractURL): - # A few ways the default option can come in, try that first - if extractWhat == 'text' or extractWhat is None: - localWords = soup.stripped_strings - elif extractWhat == 'links': - localWords = [] - for link in soup.find_all('a'): - getURL = link.get('href') - if getURL: - parsePieces = urllib.parse.urlsplit(getURL) - # Check to see if absolute or relative URL. If relative, make it absolute - if parsePieces.scheme == '' and parsePieces.netloc == '': - parseExtract = urllib.parse.urlsplit(extractURL) - getURL = urllib.parse.urljoin(f'{parseExtract.scheme}://{parseExtract.netloc}', getURL) - localWords.append(getURL) - elif extractWhat[:5] == 'html-': - localWords = [] - extractTags = extractWhat[5:].split('_') - for tag in extractTags: - for link in soup.find_all(tag): - text = link.get_text() - localWords.append(text) - else: - print_text = 'Exiting... incorrect option for webextract: <{color}>{extractWhat}' - print_line(print_text, {'extractWhat': extractWhat}) - sys.exit() - - return list(line for line in localWords) - - -def get_web_page(webURL, containerParse, webExtract): - try: - r = requests.get(webURL) - if r.status_code == 200: - inputSoup = BeautifulSoup(r.text, 'html.parser') - if containerParse: - parseDict = create_dict(containerParse) - # See if we have a class, in which case, have to do more screening (1 to N classes) - if 'class' in parseDict: - classDict = {} - classDict['class'], whichNum = parseDict['class'] - returnWords = [] - fullSoup = inputSoup.find_all(attrs=classDict) - for counter, whichSoup in enumerate(fullSoup, start=1): - if whichNum == counter or whichNum == 0: - returnWords.extend(extract_from_web(webExtract, whichSoup, webURL)) - return returnWords - else: - return extract_from_web(webExtract, inputSoup.find(attrs=parseDict), webURL) - else: - return extract_from_web(webExtract, inputSoup, webURL) - elif r.status_code == 403: - print_line('Unable to load webpage due to code 403: this usually means we are being blocked') - - else: - print_text = 'Unable to load webpage, status code = {statusCode}' - print_line(print_text, {'statusCode': r.status_code}) - - except (requests.URLRequired, requests.RequestException): - return False - - except AttributeError: - error_text = 'HTML attribute <{color}>{containerParse} not found, check document and try again' - print_line(error_text, {'containerParse': containerParse}) - sys.exit() - - except Exception as e: - print_line('Web error {e}', {'e': e}) - sys.exit() - - def get_file_content(inputFile): try: with open(inputFile, 'r') as inputOpen: @@ -287,38 +118,48 @@ def save_output(localOuput, localWords): def setup_output(localArgs, otherArgs): file_add = otherArgs['file_add'] if 'file_add' in otherArgs else GLOBAL_SETTINGS['file_add'] - # Has an output file been specified? If not, create from either file or URL - if localArgs.output: - outputFile = localArgs.output - elif localArgs.input or localArgs.urllist: - fileName = localArgs.input[0] if localArgs.input else localArgs.urllist - filePieces = os.path.splitext(fileName) - outputFile = f'{filePieces[0]}_{file_add}{filePieces[1]}' - elif localArgs.webpage: - urlPieces = urllib.parse.urlparse(localArgs.webpage).hostname.split('.') - if len(urlPieces) > 1: - outputFile = f'{urlPieces[-2]}_{urlPieces[-1]}_{file_add}.txt' + try: + # Has an output file been specified? If not, create from either file or URL + if localArgs.output: + outputFile = localArgs.output + elif localArgs.input or localArgs.urllist: + fileName = localArgs.input[0] if localArgs.input else localArgs.urllist + filePieces = os.path.splitext(fileName) + outputFile = f'{filePieces[0]}_{file_add}{filePieces[1]}' + elif localArgs.webpage: + urlPieces = urllib.parse.urlparse(localArgs.webpage).hostname.split('.') + if len(urlPieces) > 1: + outputFile = f'{urlPieces[-2]}_{urlPieces[-1]}_{file_add}.txt' + else: + outputFile = f'{urlPieces[0]}_{file_add}.txt' + # See if a directory has been specified: don't need to check if valid by now + if localArgs.directory is not None: + outputFile = os.path.join(localArgs.directory, outputFile) else: - outputFile = f'{urlPieces[0]}_{file_add}.txt' - # See if a directory has been specified: don't need to check if valid by now - if localArgs.directory is not None: - outputFile = os.path.join(localArgs.directory, outputFile) - else: - # Nothing to set up - return - - # Now check if the output file exists and prompt user if it does - if pathlib.Path(outputFile).is_file(): - prompt_text = 'Output file named <{color}>{output} already exists. Overwrite? (Y/N): ' - text = prompt(HTML(prompt_text.format(color=IMPACT_COLOR, output=outputFile))) - if text != 'Y' and text != 'y': - print_line('Exiting... please enter a different file name at the command line') - sys.exit() - return outputFile + # Nothing to set up + return + + # Now check if the output file exists and prompt user if it does + if pathlib.Path(outputFile).is_file(): + prompt_text = 'Output file named <{color}>{output} already exists. Overwrite? (Y/N): ' + text = prompt(HTML(prompt_text.format(color=IMPACT_COLOR, output=outputFile))) + if text != 'Y' and text != 'y': + print_line('Exiting... please enter a different file name at the command line') + sys.exit() + return outputFile + + except AttributeError: + print_line('Exiting... problem setting up output file. Probably a problem with the URL you entered') + sys.exit() + + except Exception as e: + print_line('Exiting... setup_output error {e}', {'e': e}) + sys.exit() def setup_input(localArgs, otherArgs): returnWords = [] + parseDict = {} # Load input(s) if localArgs.input: @@ -328,14 +169,18 @@ def setup_input(localArgs, otherArgs): returnWords.extend(fileWords) if localArgs.webpage: - webWords = get_web_page(localArgs.webpage, localArgs.container, localArgs.webextract) - if webWords: - returnWords.extend(webWords) + if localArgs.container: + parseDict = create_dict(localArgs.container) + webScrape = xwl.WebExtract(color=IMPACT_COLOR) + webScrape.get_web_page(localArgs.webpage, parseDict, localArgs.webextract) + returnWords.extend(webScrape.returnWords) if localArgs.urllist: urlList = get_file_content(localArgs.urllist) if urlList: urlLength = len(urlList) + if localArgs.container: + parseDict = create_dict(localArgs.container) for urlCount, oneUrl in enumerate(urlList, start=1): print_text = 'Getting <{color}>{oneUrl} ({urlCount} of {urlLength})' @@ -345,17 +190,15 @@ def setup_input(localArgs, otherArgs): 'urlLength': urlLength, } print_line(print_text, arg_dict, endText='') - webWords = get_web_page(oneUrl, localArgs.container, localArgs.webextract) - if webWords: - returnWords.extend(webWords) - if urlCount < urlLength: - delay = int(otherArgs['urllist_delay']) if 'urllist_delay' in otherArgs else GLOBAL_SETTINGS['urllist_delay'] - print_line(' ...done ...sleeping {delay} seconds', {'delay': delay}) - time.sleep(delay) - else: - print_line(' ...done') + webScrape = xwl.WebExtract(color=IMPACT_COLOR) + webScrape.get_web_page(oneUrl, parseDict, localArgs.webextract) + returnWords.extend(webScrape.returnWords) + if urlCount < urlLength: + delay = int(otherArgs['urllist_delay']) if 'urllist_delay' in otherArgs else GLOBAL_SETTINGS['urllist_delay'] + print_line(' ...done ...sleeping {delay} seconds', {'delay': delay}) + time.sleep(delay) else: - print_line(' ...no content retrieved, was that a URL?') + print_line(' ...done') if len(returnWords) == 0: print_text = 'No input given, nothing to do (enter <{color}>{exec_name} -h for help)' @@ -523,31 +366,36 @@ def main(): # Set up output file to make sure it doesn't exist then grab all inputs (including web parsing) outputFile = setup_output(confArgs, envArgs) - inputWords = WordList(setup_input(confArgs, envArgs)) - - # Do any text parsing - if confArgs.regex is not None: - if confArgs.regex[0] == 'true': - regex_error = 'Exiting... no regex pattern given, please use format ' - regex_error += '<{color}>regex PATTERN in configuration file' - print_line(regex_error) - sys.exit() - inputWords.regex(confArgs.regex[0]) - - # First figure out if they set either combination option - if confArgs.line2word or confArgs.word2word: - inputWords, thatOption = do_word_options(inputWords, confArgs) - else: - thatOption = False - - # Now run through remaining options - # We also trap for the case where the user had one of the line2word or word2word options - # but also uses one of the duplicative other options (we ignore the duplicative one) - optionList = ['convert', 'strip', 'minimum', 'case', 'dedupe', 'alphabetize'] - for option in optionList: - confOption = getattr(confArgs, option) - if confOption is not None: - do_ignore(option, thatOption) if thatOption else getattr(inputWords, option)(confOption) + try: + inputWords = xwl.WordList(myList=setup_input(confArgs, envArgs), color=IMPACT_COLOR) + + # Do any text parsing + if confArgs.regex is not None: + if confArgs.regex[0] == 'true': + regex_error = 'Exiting... no regex pattern given, please use format ' + regex_error += '<{color}>regex PATTERN in configuration file' + print_line(regex_error) + sys.exit() + inputWords.regex(confArgs.regex[0]) + + # First figure out if they set either combination option + if confArgs.line2word or confArgs.word2word: + inputWords, thatOption = do_word_options(inputWords, confArgs) + else: + thatOption = False + + # Now run through remaining options + # We also trap for the case where the user had one of the line2word or word2word options + # but also uses one of the duplicative other options (we ignore the duplicative one) + optionList = ['convert', 'strip', 'minimum', 'case', 'dedupe', 'alphabetize'] + for option in optionList: + confOption = getattr(confArgs, option) + if confOption is not None: + do_ignore(option, thatOption) if thatOption else getattr(inputWords, option)(confOption) + + except xwl.XWLException as e: + print_formatted_text(HTML(e)) + sys.exit() # Now save the file save_output(outputFile, inputWords)