From 1ea701a7cc9526f4b1070bb7cfbb628bcaafc795 Mon Sep 17 00:00:00 2001 From: Tim Reibe <38082956+iamnotturner@users.noreply.github.com> Date: Wed, 30 Jun 2021 10:18:32 +0200 Subject: [PATCH] feat(uc): added undetected chromedriver (#515) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * - Fix creation of random code (#499) Co-authored-by: Juri * feat(uc): Use undetected chromedriver (#497) * Varied sleep time for each call, to make it less predictable and more user-like * Use a random, real useragent * Use a random, real useragent from fake-useragent * Wait some time between clicks * Force some minimum sleeptime for random_sleeps * - Use random_sleep more often - Replace 'VACC-IPYx-xxxx' with complete random code * # Refactored mouse movements # Use computed movements more often # Wiggle a bit after reaching the correct values * # Removed fake-useragent because it was unreliant * # First test for replacing normal chromedriver with undetected-chromedriver * # First test for replacing normal chromedriver with undetected-chromedriver * # Remove selenium-wire requests from selenium_code_anfordern, by @Jonas * - Removed mouse movements by @JonasMock - Fixed annotations * - Close driver after successfull code request Co-authored-by: Juri * added chromedriver again (#502) * fix(readme): Termineingrenzung Feature in Readme eingefügt. (#508) * feat(local-chromium): replacing chromedriver binaries with google chrome or custom chromium installation (#509) * disabled local chromedriver, replaced with google chrome or custom chromium installation * option 3: set environment variables * returning webdriver exectuable as a string * fix(args): add missing configure_notifications argument (#514) * Update README.md * add missing configure_notifications argument This led to errors when running `python3 main.py code --configure-only -f max-mustermann.json`. Co-authored-by: Julius Jacobitz <47418007+JuliusJacobitz@users.noreply.github.com> Co-authored-by: Genmutant Co-authored-by: Juri Co-authored-by: Linus Schaub Co-authored-by: Christian Jülg Co-authored-by: Julius Jacobitz <47418007+JuliusJacobitz@users.noreply.github.com> --- README.md | 2 +- main.py | 17 +- requirements.txt | 3 +- tools/its.py | 652 ++++++++++++++++++-------------------------- tools/mousemover.py | 35 ++- tools/utils.py | 2 - 6 files changed, 312 insertions(+), 399 deletions(-) diff --git a/README.md b/README.md index 1e74a5cb..60194b48 100644 --- a/README.md +++ b/README.md @@ -328,7 +328,7 @@ Die Distributionen können im [neusten Release heruntergeladen werden](https://g Es gibt noch ein paar Features, die cool wären. Die Ideen werden hier mal gesammelt und werden (von uns oder euch - feel free!) irgendwann hinzukommen: -- [ ] Datum eingrenzen bei der Terminwahl +- [x] Datum eingrenzen bei der Terminwahl - [ ] Github Pages - [ ] Integrierter updater. - [ ] Macosx Build / Pipeline (Mac currently blocks the app: [Branch](https://github.com/iamnotturner/vaccipy/tree/mac-intel-build)) diff --git a/main.py b/main.py index bddcae8e..48d97917 100755 --- a/main.py +++ b/main.py @@ -367,6 +367,11 @@ def gen_code(kontaktdaten): its = ImpfterminService([], {}, PATH) + # Einmal Chrome starten, um früh einen Fehler zu erzeugen, falls die + # erforderliche Software nicht installiert ist. + its.log.info("Prüfen von Chromium und Chromedriver") + its.get_chromedriver(headless=True).quit() + print("\nBitte trage nachfolgend dein Geburtsdatum im Format DD.MM.YYYY ein.\n" "Beispiel: 02.03.1982\n") while True: @@ -404,7 +409,7 @@ def subcommand_search(args): def subcommand_code(args): if args.configure_only: update_kontaktdaten_interactive( - get_kontaktdaten(args.file), "code", args.file) + get_kontaktdaten(args.file), "code", args.configure_notifications, args.file) elif args.read_only: gen_code(get_kontaktdaten(args.file)) else: @@ -564,6 +569,16 @@ def main(): else: print("Falscheingabe! Bitte erneut versuchen.") print() + except TypeError as exc: + if str(exc) == "expected str, bytes or os.PathLike object, not NoneType": + print("\nChromium nicht gefunden. Drei Möglichkeiten zur Problembehebung:\n" + "1) Google Chrome installieren: https://www.google.com/intl/de_de/chrome/\n" + "2) Chromium über das Vaccipy-Menü installieren: " + "'[3] Eigene Chromium Instanz im Vaccipy Ordner installieren'\n" + "3) Pfad für Chromium und Chromedriver über Umgebungsvariablen festlegen: " + "VACCIPY_CHROME_BIN (Chromium) und VACCIPY_CHROMEDRIVER (Chromedriver)\n") + else: + print(f"\nUnbekannter TypeError:\n{str(exc)}\n") except Exception as exc: print(f"\nFehler:\n{str(exc)}\n") diff --git a/requirements.txt b/requirements.txt index 986e4882..64cae9a1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,5 @@ PyQt5-Qt5==5.15.2 PyQt5-sip==12.9.0 requests>=2.25.1 tqdm>=4.61.0 -selenium-wire>=4.3.1 \ No newline at end of file +selenium-wire>=4.3.1 +undetected-chromedriver>=3.0.1 \ No newline at end of file diff --git a/tools/its.py b/tools/its.py index 287f13c3..504cbef2 100644 --- a/tools/its.py +++ b/tools/its.py @@ -1,11 +1,11 @@ # Alphabetisch sortiert: import copy -import json import os import platform +import random import string -import sys import time + # Alphabetisch sortiert: from base64 import b64encode from datetime import datetime, date, timedelta @@ -13,25 +13,25 @@ from itertools import cycle from json import JSONDecodeError from random import choice, choices, randint +from typing import Optional, Tuple, Dict import cloudscraper +import undetected_chromedriver.v2 as uc from requests.exceptions import RequestException from selenium.common.exceptions import WebDriverException from selenium.webdriver import ActionChains -from selenium.webdriver import Chrome -from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait -from seleniumwire import webdriver as selenium_wire -from tools.chromium_downloader import chromium_executable, check_chromium, webdriver_executable, \ - check_webdriver +from tools.chromium_downloader import webdriver_executable, \ + check_webdriver, chromium_executable, check_chromium from tools.clog import CLogger from tools.exceptions import AppointmentGone, BookingError, TimeframeMissed, UnmatchingCodeError from tools.kontaktdaten import decode_wochentag, validate_codes, validate_kontakt, \ validate_zeitrahmen -from tools.mousemover import move_mouse_to_coordinates +from tools.mousemover import move_mouse_to_coordinates, move_mouse_to_element from tools.utils import fire_notifications, unique try: @@ -43,7 +43,9 @@ class ImpfterminService(): - def __init__(self, codes: list, kontakt: dict, PATH: str, notifications: dict = dict()): + def __init__(self, codes: list, kontakt: dict, PATH: str, notifications=None): + if notifications is None: + notifications = dict() self.PATH = PATH self.kontakt = kontakt self.operating_system = platform.system().lower() @@ -53,10 +55,13 @@ def __init__(self, codes: list, kontakt: dict, PATH: str, notifications: dict = # Logging einstellen self.log = CLogger("impfterminservice") + # User agent festlegen + self.useragent = prepare_useragent() + # Session erstellen self.s = cloudscraper.create_scraper() self.s.headers.update({ - 'User-Agent': 'Mozilla/5.0', + 'User-Agent': self.useragent, }) # Ausgewähltes Impfzentrum prüfen @@ -67,7 +72,7 @@ def __init__(self, codes: list, kontakt: dict, PATH: str, notifications: dict = except RuntimeError as exc: self.log.error(str(exc)) self.log.info("Erneuter Versuch in 30 Sekunden") - time.sleep(30) + random_sleep(30) # Ein "Codepoint" ist ein dict, das einen Vermittlungscode ("code") # und den Zeitpunkt ("next_use") enthält, zu dem der Code frühestens @@ -167,7 +172,7 @@ def impfzentren_laden(self): result[url].append(iz) return result - def impfstoffe_laden(self, url): + def impfstoffe_laden(self, url: str): """ Lädt die verfügbaren Impstoff-Qualifikationen @@ -249,32 +254,19 @@ def get_chromedriver_path(self): if chromedriver_from_env: return chromedriver_from_env if check_webdriver(): - return webdriver_executable() - - # Chromedriver anhand des OS auswählen - if 'linux' in self.operating_system: - if "64" in platform.architecture() or sys.maxsize > 2 ** 32: - return os.path.join(self.PATH, "tools/chromedriver/chromedriver-linux-64") - else: - return os.path.join(self.PATH, "tools/chromedriver/chromedriver-linux-32") - elif 'windows' in self.operating_system: - return os.path.join(self.PATH, "tools/chromedriver/chromedriver-windows.exe") - elif 'darwin' in self.operating_system: - if "arm" in platform.processor().lower(): - return os.path.join(self.PATH, "tools/chromedriver/chromedriver-mac-m1") - else: - return os.path.join(self.PATH, "tools/chromedriver/chromedriver-mac-intel") - else: - raise ValueError(f"Nicht unterstütztes Betriebssystem {self.operating_system}") + return str(webdriver_executable()) - def get_chrome_options(self, headless): - chrome_options = Options() + def get_chrome_options(self, headless: bool): + chrome_options = uc.ChromeOptions() # deaktiviere Selenium Logging - chrome_options.add_argument('disable-infobars') - chrome_options.add_experimental_option('useAutomationExtension', False) - chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) - chrome_options.add_experimental_option('excludeSwitches', ['enable-logging']) + chrome_options.add_argument('--disable-infobars') + + # TODO: according to the annotations, second param should be a dict + # FIXME invalid argument: cannot parse capability: goog:chromeOptions + # FIXME invalid argument: unrecognized chrome option: useAutomationExtension + #chrome_options.add_experimental_option('useAutomationExtension', False) + #chrome_options.add_experimental_option('excludeSwitches', ['enable-automation', 'enable-logging']) # Zur Behebung von "DevToolsActivePort file doesn't exist" # chrome_options.add_argument("-no-sandbox"); @@ -283,22 +275,24 @@ def get_chrome_options(self, headless): # Chrome head is only required for the backup booking process. # User-Agent is required for headless, because otherwise the server lets us hang. - chrome_options.add_argument("user-agent=Mozilla/5.0") chromebin_from_env = os.getenv("VACCIPY_CHROME_BIN") if chromebin_from_env: - chrome_options.binary_location = os.getenv("VACCIPY_CHROME_BIN") + # check for env variable with chromium binary path + chrome_options.binary_location = chromebin_from_env elif check_chromium(): + # check for local installed chromium and set as binary executable chrome_options.binary_location = str(chromium_executable()) chrome_options.headless = headless return chrome_options - def get_chromedriver(self, headless): - return Chrome(self.get_chromedriver_path(), options=self.get_chrome_options(headless)) + def get_chromedriver(self, headless: bool) -> WebDriver: + return uc.Chrome(executable_path=self.get_chromedriver_path(), + options=self.get_chrome_options(headless)) - def driver_enter_code(self, driver, impfzentrum, code): + def driver_enter_code(self, driver: WebDriver, impfzentrum: Dict, code: str): """ TODO xpath code auslagern """ @@ -322,7 +316,7 @@ def driver_enter_code(self, driver, impfzentrum, code): driver.add_cookie(queue_cookie) # Seite neu laden - time.sleep(5) + random_sleep(5) driver.get(location) driver.refresh() @@ -335,51 +329,18 @@ def driver_enter_code(self, driver, impfzentrum, code): # Klick auf "Auswahl bestätigen" im Cookies-Banner button_xpath = "//a[contains(@class,'cookies-info-close')][1]" - button = WebDriverWait(driver, 1).until( - EC.element_to_be_clickable((By.XPATH, button_xpath))) - action = ActionChains(driver) - - # Simulation der Mausbewegung - element = driver.find_element_by_xpath(button_xpath) - current_mouse_positon = move_mouse_to_coordinates(self.log, current_mouse_positon[0], - current_mouse_positon[1], - element.location['x'], - element.location['y'], driver) - - action.click(button).perform() + current_mouse_positon = self.move_and_click_xpath(button_xpath, current_mouse_positon, driver) # Klick auf "Vermittlungscode bereits vorhanden" button_xpath = "//input[@name=\"vaccination-approval-checked\"]/.." - button = WebDriverWait(driver, 1).until( - EC.element_to_be_clickable((By.XPATH, button_xpath))) - action = ActionChains(driver) - - # Simulation der Mausbewegung - element = driver.find_element_by_xpath(button_xpath) - current_mouse_positon = move_mouse_to_coordinates(self.log, current_mouse_positon[0], - current_mouse_positon[1], - element.location['x'], - element.location['y'], driver) - - action.click(button).perform() + current_mouse_positon = self.move_and_click_xpath(button_xpath, current_mouse_positon, driver) # Auswahl des ersten Code-Input-Feldes input_xpath = "//input[@name=\"ets-input-code-0\"]" - input_field = WebDriverWait(driver, 1).until( - EC.element_to_be_clickable((By.XPATH, input_xpath))) - action = ActionChains(driver) - - # Simulation der Mausbewegung - element = driver.find_element_by_xpath(input_xpath) - current_mouse_positon = move_mouse_to_coordinates(self.log, current_mouse_positon[0], - current_mouse_positon[1], - element.location['x'], - element.location['y'], driver) - - action.click(input_field).perform() + current_mouse_positon = self.move_and_click_xpath(input_xpath, current_mouse_positon, driver) # Code etwas realistischer eingeben - # Zu schnelle Eingabe erzeugt ebenfalls manchmal "Ein unerwarteter Fehler ist aufgetreten" + # Zu schnelle Eingabe erzeugt ebenfalls manchmal "Ein unerwarteter Fehler ist aufgetreten" for index, subcode in enumerate(code.split("-")): if index == 0: @@ -393,44 +354,50 @@ def driver_enter_code(self, driver, impfzentrum, code): input_xpath = "//input[@name=\"ets-input-code-2\"]" # Input Feld auswählen - input_field = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, - input_xpath))) - action = ActionChains(driver) - action.move_to_element(input_field).click().perform() + + input_field = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, input_xpath))) + current_mouse_positon = self.move_and_click_xpath(input_xpath, current_mouse_positon, driver) # Chars einzeln eingeben mit kleiner Pause for char in subcode: input_field.send_keys(char) - time.sleep(randint(500, 1000) / 1000) + random_sleep(0.5, percent_max_deviation=50) # Klick auf "Termin suchen" button_xpath = "//app-corona-vaccination-yes//button[@type=\"submit\"]" - button = WebDriverWait(driver, 1).until( - EC.element_to_be_clickable((By.XPATH, button_xpath))) + button = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, button_xpath))) action = ActionChains(driver) element = driver.find_element_by_xpath(button_xpath) # Simulation der Mausbewegung - _ = move_mouse_to_coordinates(self.log, current_mouse_positon[0], current_mouse_positon[1], - element.location['x'], element.location['y'], driver) + current_mouse_positon = move_mouse_to_element(self.log, current_mouse_positon, element, driver) action.click(button).perform() # Zweiter Klick-Versuch, falls Meldung "Es ist ein unerwarteter Fehler aufgetreten" erscheint answer_xpath = "//app-corona-vaccination-yes//span[@class=\"text-pre-wrap\"]" try: - time.sleep(0.5) + random_sleep(0.5) element = driver.find_element_by_xpath(answer_xpath) if element.text == "Es ist ein unerwarteter Fehler aufgetreten": action.click(button).perform() except Exception as e: pass - time.sleep(1.5) + random_sleep(1.5) - def driver_get_cookies(self, driver, url, manual): + def move_and_click_xpath(self, xpath: str, current_mouse_positon: Tuple[int, int], driver: WebDriver): + button = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, xpath))) + element = driver.find_element_by_xpath(xpath) + current_mouse_positon = move_mouse_to_element(self.log, current_mouse_positon, element, driver) + action = ActionChains(driver) + action.click(button).perform() + return current_mouse_positon + + def driver_get_cookies(self, driver: WebDriver, url: str, manual: bool): # Erstelle zufälligen Vermittlungscode für die Cookie-Generierung chars = string.ascii_uppercase + string.digits - random_code = f"{choices(chars, k=4)}-{choices(chars, k=4)}-{choices(chars, k=4)}" + random_chars = "".join(choices(chars, k=12)) + random_code = f"{random_chars[0:4]}-{random_chars[4:8]}-{random_chars[8:]}" # Kann WebDriverException nach außen werfen: self.driver_enter_code( @@ -439,7 +406,7 @@ def driver_get_cookies(self, driver, url, manual): self.log.warn( "Du hast jetzt 30 Sekunden Zeit möglichst viele Elemente im Chrome Fenster " "anzuklicken. Das Fenster schließt sich automatisch.") - time.sleep(30) + random_sleep(30) required = ["bm_sz", "akavpau_User_allowed"] optional = ["bm_sv", "bm_mi", "ak_bmsc", "_abck"] @@ -458,9 +425,11 @@ def driver_get_cookies(self, driver, url, manual): self.log.info(f"Browser-Cookie generiert: *{cookies['bm_sz'][-6:]}") return cookies - def driver_termin_buchen(self, driver, reservierung): + def driver_termin_buchen(self, driver: WebDriver, reservierung: Dict): timestamp = time.strftime("%Y%m%d-%H%M%S") filepath = os.path.join(self.PATH, "tools", "log") + current_mouse_positon = (randint(1, driver.get_window_size()["width"]-1), + randint(1, driver.get_window_size()["height"]-1)) try: self.driver_enter_code( @@ -472,10 +441,7 @@ def driver_termin_buchen(self, driver, reservierung): try: # Klick auf "Termin suchen" button_xpath = "//button[@data-target=\"#itsSearchAppointmentsModal\"]" - button = WebDriverWait(driver, 1).until( - EC.element_to_be_clickable((By.XPATH, button_xpath))) - action = ActionChains(driver) - action.move_to_element(button).click().perform() + current_mouse_positon = self.move_and_click_xpath(button_xpath, current_mouse_positon, driver) except: self.log.error("Termine können nicht gesucht werden") try: @@ -487,14 +453,10 @@ def driver_termin_buchen(self, driver, reservierung): # Termin auswählen try: - time.sleep(3) - button_xpath = '//*[@id="itsSearchAppointmentsModal"]/div/div/div[2]/div/div/form/' \ - 'div[1]/div[2]/label/div[2]/div' - button = WebDriverWait(driver, 1).until( - EC.element_to_be_clickable((By.XPATH, button_xpath))) - action = ActionChains(driver) - action.move_to_element(button).click().perform() - time.sleep(.5) + random_sleep(3) + button_xpath = '//*[@id="itsSearchAppointmentsModal"]/div/div/div[2]/div/div/form/div[1]/div[2]/label/div[2]/div' + current_mouse_positon = self.move_and_click_xpath(button_xpath, current_mouse_positon, driver) + random_sleep(.5) except: self.log.error("Termine können nicht ausgewählt werden") try: @@ -510,24 +472,17 @@ def driver_termin_buchen(self, driver, reservierung): # Klick Button "AUSWÄHLEN" try: button_xpath = '//*[@id="itsSearchAppointmentsModal"]//button[@type="submit"]' - button = WebDriverWait(driver, 1).until( - EC.element_to_be_clickable((By.XPATH, button_xpath))) - action = ActionChains(driver) - action.move_to_element(button).click().perform() - time.sleep(.5) + current_mouse_positon = self.move_and_click_xpath(button_xpath, current_mouse_positon, driver) + random_sleep(.5) except: self.log.error("Termine können nicht ausgewählt werden (Button)") pass # Klick Daten erfassen try: - button_xpath = '/html/body/app-root/div/app-page-its-search/div/div/div[2]/div/div/' \ - 'div[5]/div/div[2]/div[2]/div[2]/button' - button = WebDriverWait(driver, 1).until( - EC.element_to_be_clickable((By.XPATH, button_xpath))) - action = ActionChains(driver) - action.move_to_element(button).click().perform() - time.sleep(.5) + button_xpath = '/html/body/app-root/div/app-page-its-search/div/div/div[2]/div/div/div[5]/div/div[2]/div[2]/div[2]/button' + current_mouse_positon = self.move_and_click_xpath(button_xpath, current_mouse_positon, driver) + random_sleep(.5) except: self.log.error("1. Daten können nicht erfasst werden") pass @@ -544,17 +499,12 @@ def driver_termin_buchen(self, driver, reservierung): '[contains(@class,"ets-radio-wrapper")]/label[@class=' \ '"ets-radio-control"]/span[contains(text(),"Divers")]' - button = WebDriverWait(driver, 1).until( - EC.element_to_be_clickable((By.XPATH, button_xpath))) - action = ActionChains(driver) - action.move_to_element(button).click().perform() + current_mouse_positon = self.move_and_click_xpath(button_xpath, current_mouse_positon, driver) # Input Vorname - input_xpath = '//*[@id="itsSearchContactModal"]//app-booking-contact-form//input' \ - '[@formcontrolname="firstname"]' - input_field = WebDriverWait(driver, 1).until( - EC.element_to_be_clickable((By.XPATH, input_xpath))) - action.move_to_element(input_field).click().perform() + input_xpath = '//*[@id="itsSearchContactModal"]//app-booking-contact-form//input[@formcontrolname="firstname"]' + input_field = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, input_xpath))) + current_mouse_positon = self.move_and_click_xpath(input_xpath, current_mouse_positon, driver) input_field.send_keys(self.kontakt['vorname']) # Input Nachname @@ -610,23 +560,16 @@ def driver_termin_buchen(self, driver, reservierung): # Klick Button "ÜBERNEHMEN" try: button_xpath = '//*[@id="itsSearchContactModal"]//button[@type="submit"]' - button = WebDriverWait(driver, 1).until( - EC.element_to_be_clickable((By.XPATH, button_xpath))) - action = ActionChains(driver) - action.move_to_element(button).click().perform() - time.sleep(.7) + current_mouse_positon = self.move_and_click_xpath(button_xpath, current_mouse_positon, driver) + random_sleep(.7) except: self.log.error("Button ÜBERNEHMEN kann nicht gedrückt werden") pass # Termin buchen try: - button_xpath = '/html/body/app-root/div/app-page-its-search/div/div/div[2]/' \ - 'div/div/div[5]/div/div[3]/div[2]/div[2]/button' - button = WebDriverWait(driver, 1).until( - EC.element_to_be_clickable((By.XPATH, button_xpath))) - action = ActionChains(driver) - action.move_to_element(button).click().perform() + button_xpath = '/html/body/app-root/div/app-page-its-search/div/div/div[2]/div/div/div[5]/div/div[3]/div[2]/div[2]/button' + current_mouse_positon = self.move_and_click_xpath(button_xpath, current_mouse_positon, driver) except: self.log.error("Button Termin buchen kann nicht gedrückt werden") pass @@ -635,7 +578,7 @@ def driver_termin_buchen(self, driver, reservierung): if "Ihr Termin am" not in str(driver.page_source): raise BookingError() - def get_cookies(self, url, manual): + def get_cookies(self, url: str, manual: bool) -> dict: """ Cookies der Session erneuern, wenn sie abgelaufen sind. :return: @@ -651,7 +594,7 @@ def get_cookies(self, url, manual): finally: driver.quit() - def selenium_termin_buchen(self, reservierung): + def selenium_termin_buchen(self, reservierung: Dict): """ Backup Prozess: Wenn die Terminbuchung mit dem Bot nicht klappt, wird das @@ -676,7 +619,7 @@ def selenium_termin_buchen(self, reservierung): finally: driver.quit() - def login(self, plz_impfzentrum, code, cookies): + def login(self, plz_impfzentrum: str, code: str, cookies): """ Einloggen mittels Vermittlungscode, um qualifizierte Impfstoffe zu erhalten. @@ -721,7 +664,7 @@ def login(self, plz_impfzentrum, code, cookies): "Login mit Code fehlgeschlagen: " f"JSONDecodeError: {str(exc)}") from exc - def reservierung_finden(self, zeitrahmen: dict, plz: str): + def reservierung_finden(self, zeitrahmen: dict, plz: str) -> Optional[Dict]: url = self.impfzentrum_in_plz(plz)["URL"] codepoints = self.codepoints[url] if not codepoints: @@ -760,7 +703,7 @@ def reservierung_finden(self, zeitrahmen: dict, plz: str): return None def reservierung_finden_mit_code( - self, zeitrahmen: dict, plz: str, code: str): + self, zeitrahmen: Dict, plz: str, code: str) -> Optional[Dict]: """ Es wird überprüft, ob im Impfzentrum in der gegebenen PLZ ein oder mehrere Terminpaare (oder Einzeltermine) verfügbar sind, die dem @@ -985,7 +928,7 @@ def code_anfordern(self, mail, telefonnummer, except RequestException as exc: self.log.error(f"Vermittlungscode kann nicht angefragt werden: {str(exc)}") self.log.info("Erneuter Versuch in 30 Sekunden") - time.sleep(30) + random_sleep(30) continue # Neuer Versuch in nächster Iteration if res.status_code == 429: @@ -1004,7 +947,7 @@ def code_anfordern(self, mail, telefonnummer, "Code kann nicht angefragt werden: " f"{res.status_code} {res.text}") self.log.info("Erneuter Versuch in 30 Sekunden") - time.sleep(30) + random_sleep(30) continue # Neuer Versuch in nächster Iteration try: @@ -1012,13 +955,16 @@ def code_anfordern(self, mail, telefonnummer, except JSONDecodeError as exc: raise RuntimeError(f"JSONDecodeError: {str(exc)}") from exc - return (token, cookies) + return token, cookies - def selenium_code_anfordern(self, mail, telefonnummer, - plz_impfzentrum, geburtsdatum): - """ - SMS-Code beim Impfterminservice via Selenium anfordern. + def selenium_code_anfordern(self, mail: str, telefonnummer: str, + plz_impfzentrum: str, geburtsdatum: str) -> bool: + return self.undetected_selenium_code_anfordern(mail, telefonnummer, plz_impfzentrum, geburtsdatum) + def undetected_selenium_code_anfordern(self, mail: str, telefonnummer: str, + plz_impfzentrum: str, geburtsdatum: str) -> bool: + """ + SMS-Code beim Impfterminservice via undetected Selenium anfordern. :param mail: Mail für Empfang des Codes :param telefonnummer: Telefonnummer für SMS-Code, inkl. Präfix +49 :param plz_impfzentrum: PLZ des Impfzentrums, für das ein Code erstellt werden soll @@ -1037,260 +983,153 @@ def selenium_code_anfordern(self, mail, telefonnummer, "einzeltermin": False } - # Wire Selenium driver um request im webdriver auszulesen - driver = selenium_wire.Chrome(self.get_chromedriver_path(), - options=self.get_chrome_options(False)) + driver = self.get_chromedriver(headless=False) - while True: + driver.get(f"{url}impftermine/service?plz={plz_impfzentrum}") + self.log.info("Generierung eines Vermittlungscodes via Selenium gestartet.") - driver.get(f"{url}impftermine/service?plz={plz_impfzentrum}") - self.log.info("Generierung eines Vermittlungscodes via Selenium gestartet.") + # Queue Bypass + while True: + queue_cookie = driver.get_cookie("akavpwr_User_allowed") - # Queue Bypass - while True: - queue_cookie = driver.get_cookie("akavpwr_User_allowed") + if not queue_cookie \ + or "Virtueller Warteraum" not in driver.page_source: + break - if not queue_cookie \ - or "Virtueller Warteraum" not in driver.page_source: - break + self.log.info("Im Warteraum, Seite neu laden") + queue_cookie["name"] = "akavpau_User_allowed" + driver.add_cookie(queue_cookie) - self.log.info("Im Warteraum, Seite neu laden") - queue_cookie["name"] = "akavpau_User_allowed" - driver.add_cookie(queue_cookie) + # Seite des Impzentrums laden + driver.get(f"{url}impftermine/service?plz={plz_impfzentrum}") + driver.refresh() + + # ets-session-its-cv-quick-check im SessionStorage setzen um verfügbare Termine + # zu simulieren + ets_session_its_cv_quick_check = '{"birthdate":"' + data["birthday"] + \ + '","slotsAvailable":{"pair":true,"single":false}}' + driver.execute_script( + 'window.sessionStorage.setItem("ets-session-its-cv-quick-check",\'' + + ets_session_its_cv_quick_check + '\');') + # self.log.info( + # "\"ets-session-its-cv-quick-check\" Key:Value zum sessionStorage hinzugefügt.") + + # Durch ets-session-its-cv-quick-check im SessionStorage kann direkt der Check + # aufgerufen werden + driver.get(f"{url}impftermine/check") + self.log.info("Überprüfung der Impfberechtigung übersprungen / Vorhandene Termine " + "simuliert und impftermine/check geladen.") + + random_sleep(2) + + # Anpassen der HTML elemente im Browser um Nutzer aktuellen Status anzuzeigen + check_h1_xpath = "//app-its-check-success//h1" + check_h1 = driver.find_element_by_xpath(check_h1_xpath) + driver.execute_script("arguments[0].setAttribute('style','color: #FF0000;font-weight: " + "bold; font-size: 35px;')", check_h1) + driver.execute_script(f"arguments[0].innerText='Vaccipy! - Bitte nichts eingeben " + f"oder anklicken.'", check_h1) + check_p_xpath = "//app-its-check-success//p" + check_p = driver.find_element_by_xpath(check_p_xpath) + driver.execute_script("arguments[0].setAttribute('style','font-weight: bold; " + "font-size: 25px;')", check_p) - # Seite des Impzentrums laden - time.sleep(5) - driver.get(f"{url}impftermine/service?plz={plz_impfzentrum}") - driver.refresh() + # Klick auf "Auswahl bestätigen" im Cookies-Banner + button_xpath = "//a[contains(@class,'cookies-info-close')][1]" + button = WebDriverWait(driver, 1).until( + EC.element_to_be_clickable((By.XPATH, button_xpath))) + action = ActionChains(driver) + driver.execute_script(f"arguments[0].innerText='Status: Cookie-Banner Anklicken'", check_p) + action.click(button).perform() - # ets-session-its-cv-quick-check im SessionStorage setzen um verfügbare Termine - # zu simulieren - ets_session_its_cv_quick_check = '{"birthdate":"' + data["birthday"] + \ - '","slotsAvailable":{"pair":true,"single":false}}' - driver.execute_script( - 'window.sessionStorage.setItem("ets-session-its-cv-quick-check",\'' - + ets_session_its_cv_quick_check + '\');') - self.log.info( - "\"ets-session-its-cv-quick-check\" Key:Value zum sessionStorage hinzugefügt.") + # Eingabe Mail + input_xpath = "//input[@formcontrolname=\"email\"]" + driver.execute_script(f"arguments[0].innerText='Status: E-Mail wird eingegeben'", check_p) + # Input Feld auswählen + input_field = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, input_xpath))) + action = ActionChains(driver) + action.move_to_element(input_field).click().perform() + input_field.send_keys(data['email']) + + # Eingabe Phone + input_xpath = "//input[@formcontrolname=\"phone\"]" + driver.execute_script(f"arguments[0].innerText='Status: Telefonnummer wird eingegeben'", check_p) + # Input Feld auswählen + input_field = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, input_xpath))) + action = ActionChains(driver) + action.move_to_element(input_field).click().perform() + input_field.send_keys(data['phone'][3:]) - # Durch ets-session-its-cv-quick-check im SessionStorage kann direkt der Check - # aufgerufen werden - driver.get(f"{url}impftermine/check") - self.log.info("Überprüfung der Impfberechtigung übersprungen / Vorhandene Termine " - "simuliert und impftermine/check geladen.") + # Anfrage absenden + button_xpath = "//app-its-check-success//button[@type=\"submit\"]" + button = WebDriverWait(driver, 1).until( + EC.element_to_be_clickable((By.XPATH, button_xpath))) + action = ActionChains(driver) + driver.execute_script(f"arguments[0].innerText='Status: Versuche Anfrage abzuschicken'", check_p) + action.move_to_element(button).click().perform() - time.sleep(1) + # Zweiter Klick-Versuch, falls Meldung "Es ist ein unerwarteter Fehler aufgetreten" erscheint + # Falls eine andere Meldung aufgetrteten ist -> Abbruch + try: + answer_xpath = "//app-its-check-success//span[@class=\"text-pre-wrap\"]" + random_sleep(0.5) + element = driver.find_element_by_xpath(answer_xpath) + except Exception as e: + element = None - # Anpassen der HTML elemente im Browser um Nutzer aktuellen Status anzuzeigen - check_h1_xpath = "//app-its-check-success//h1" - check_h1 = driver.find_element_by_xpath(check_h1_xpath) - driver.execute_script("arguments[0].setAttribute('style','color: #FF0000;font-weight: " - "bold; font-size: 35px;')", check_h1) - driver.execute_script(f"arguments[0].innerText='Vaccipy! - Bitte nichts eingeben " - f"oder anklicken.'", check_h1) - check_p_xpath = "//app-its-check-success//p" - check_p = driver.find_element_by_xpath(check_p_xpath) - driver.execute_script("arguments[0].setAttribute('style','font-weight: bold; " - "font-size: 25px;')", check_p) - - # random start position - current_mouse_positon = (randint(1, driver.get_window_size()["width"] - 1), - randint(1, driver.get_window_size()["height"] - 1)) - - # Simulation der Mausbewegung - driver.execute_script(f"arguments[0].innerText='Status: Maussimulation nach x: " - f"{current_mouse_positon[0]}, y:{current_mouse_positon[1]}'", - check_p) - current_mouse_positon = move_mouse_to_coordinates(self.log, 0, 0, - current_mouse_positon[0], - current_mouse_positon[1], driver) - - # Klick auf "Auswahl bestätigen" im Cookies-Banner - button_xpath = "//a[contains(@class,'cookies-info-close')][1]" - button = WebDriverWait(driver, 1).until( - EC.element_to_be_clickable((By.XPATH, button_xpath))) - action = ActionChains(driver) - - # Simulation der Mausbewegung - element = driver.find_element_by_xpath(button_xpath) - driver.execute_script(f"arguments[0].innerText='Status: Maussimulation nach x: " - f"{element.location['x']}, y: {element.location['y']}'", check_p) - current_mouse_positon = move_mouse_to_coordinates(self.log, current_mouse_positon[0], - current_mouse_positon[1], - element.location['x'], - element.location['y'], driver) - driver.execute_script(f"arguments[0].innerText='Status: Cookie-Banner Anklicken'", - check_p) - action.click(button).perform() - time.sleep(0.5) - - # Eingabe Mail - input_xpath = "//input[@formcontrolname=\"email\"]" - # Input Feld auswählen - input_field = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, - input_xpath))) - action = ActionChains(driver) - - # Simulation der Mausbewegung - element = driver.find_element_by_xpath(input_xpath) - driver.execute_script(f"arguments[0].innerText='Status: Maussimulation nach x: " - f"{element.location['x']}, y: {element.location['y']}'", - check_p) - current_mouse_positon = move_mouse_to_coordinates(self.log, current_mouse_positon[0], - current_mouse_positon[1], - element.location['x'], - element.location['y'], driver) - action.move_to_element(input_field).click().perform() + if element: + if element.text == "Es ist ein unerwarteter Fehler aufgetreten": + driver.execute_script(f"arguments[0].innerText='Status: Zweiter Versuch " + f"Anfrage abzuschicken'", check_p) + action.move_to_element(button).click().perform() + elif element.text == "Anfragelimit erreicht.": + driver.close() + raise RuntimeError("Anfragelimit erreicht") + elif element.text == "Geburtsdatum ungueltig oder in der Zukunft": + driver.close() + raise RuntimeError("Geburtsdatum ungueltig oder in der Zukunft") - # Chars einzeln eingeben mit kleiner Pause - driver.execute_script(f"arguments[0].innerText='Status: E-Mail wird eingegeben'", - check_p) - for char in data['email']: - input_field.send_keys(char) - time.sleep(randint(500, 1000) / 1000) + # Prüfen ob SMS Verifizierung geladen wurde falls nicht Abbruch + sms_verifizierung_h1_xpath = "//app-page-its-check-result//h1" + sms_verifizierung_h1 = driver.find_element_by_xpath(sms_verifizierung_h1_xpath) + if sms_verifizierung_h1.text != "SMS Verifizierung": + driver.close() + raise RuntimeError("Vermittlungscode kann derzeit nicht angefragt werden. " + "Versuchen Sie es später erneut.") - self.log.info("E-Mail Adresse eingegeben.") - time.sleep(0.5) + # Ab jetzt befinden wir uns auf der SMS Verifizierung Seite + success_location = f"{url}impftermine/service/{plz_impfzentrum}" + self.log.info("SMS-Anfrage an Server versandt.") + self.log.info("Bitte SMS-Code innerhalb der nächsten 90 Sekunden im Browser-Fenster " + "eingeben.") - # Eingabe Phone - input_xpath = "//input[@formcontrolname=\"phone\"]" - # Input Feld auswählen - input_field = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, - input_xpath))) - action = ActionChains(driver) - - # Simulation der Mausbewegung - element = driver.find_element_by_xpath(input_xpath) - driver.execute_script(f"arguments[0].innerText='Status: Maussimulation nach x: " - f"{element.location['x']}, y: {element.location['y']}'", check_p) - current_mouse_positon = move_mouse_to_coordinates(self.log, current_mouse_positon[0], - current_mouse_positon[1], - element.location['x'], - element.location['y'], driver) - action.move_to_element(input_field).click().perform() + # 90 Sekunden lang auf Antwort vom Server warten + # Eventuell gibt User seinen Pin falsch ein etc. + max_sms_code_eingabe_sekunden = 90 + while max_sms_code_eingabe_sekunden: - # Chars einzeln eingeben mit kleiner Pause - driver.execute_script(f"arguments[0].innerText='Status: Telefonnummer wird eingegeben'", - check_p) - for char in data['phone'][3:]: - input_field.send_keys(char) - time.sleep(randint(500, 1000) / 1000) - - self.log.info("Telefonnummer eingegeben.") - time.sleep(0.5) - - # Anfrage absenden - button_xpath = "//app-its-check-success//button[@type=\"submit\"]" - button = WebDriverWait(driver, 1).until( - EC.element_to_be_clickable((By.XPATH, button_xpath))) - action = ActionChains(driver) - - # Simulation der Mausbewegung - element = driver.find_element_by_xpath(button_xpath) - driver.execute_script(f"arguments[0].innerText='Status: Maussimulation nach x: " - f"{element.location['x']}, y: {element.location['y']}'", check_p) - current_mouse_positon = move_mouse_to_coordinates(self.log, current_mouse_positon[0], - current_mouse_positon[1], - element.location['x'], - element.location['y'], driver) - driver.execute_script( - f"arguments[0].innerText='Status: Versuche Anfrage abzuschicken'", - check_p) - action.move_to_element(button).click().perform() - - # Zweiter Klick-Versuch, falls Meldung "Es ist ein unerwarteter Fehler aufgetreten" erscheint - # Falls eine andere Meldung aufgetrteten ist -> Abbruch + # Verbleibende Zeit anzeigen try: - answer_xpath = "//app-its-check-success//span[@class=\"text-pre-wrap\"]" - time.sleep(0.5) - element = driver.find_element_by_xpath(answer_xpath) + driver.execute_script(f"arguments[0].innerText='Vaccipy! - Bitte SMS-Code " + f"im Browser eingeben. Noch {max_sms_code_eingabe_sekunden} " + f"Sekunden verbleibend.'", sms_verifizierung_h1) except Exception as e: - element = None - - if element: - if element.text == "Es ist ein unerwarteter Fehler aufgetreten": - driver.execute_script(f"arguments[0].innerText='Status: Zweiter Versuch " - f"Anfrage abzuschicken'", check_p) - action.move_to_element(button).click().perform() - elif element.text == "Anfragelimit erreicht.": - driver.close() - raise RuntimeError("Anfragelimit erreicht") - elif element.text == "Geburtsdatum ungueltig oder in der Zukunft": - driver.close() - raise RuntimeError("Geburtsdatum ungueltig oder in der Zukunft") - - time.sleep(2) - - # Prüfen ob SMS Verifizierung geladen wurde falls nicht Abbruch - sms_verifizierung_h1_xpath = "//app-page-its-check-result//h1" - sms_verifizierung_h1 = driver.find_element_by_xpath(sms_verifizierung_h1_xpath) - if sms_verifizierung_h1.text != "SMS Verifizierung": - driver.close() - raise RuntimeError("Vermittlungscode kann derzeit nicht angefragt werden. " - "Versuchen Sie es später erneut.") + pass - # Ab jetzt befinden wir uns auf der SMS Verifizierung Seite - location = f"{url}rest/smspin/verifikation" - self.log.info("SMS-Anfrage an Server versandt.") - self.log.info("Bitte SMS-Code innerhalb der nächsten 60 Sekunden im Browser-Fenster " - "eingeben.") - - # 90 Sekunden lang auf Antwort vom Server warten - # Eventuell gibt User seinen Pin falsch ein etc. - max_sms_code_eingabe_sekunden = 90 - while max_sms_code_eingabe_sekunden: + if driver.current_url == success_location: + self.log.info("Bestätigungscode erfolgreich an Server versandt. Bitte prüfen Sie Ihre E-Mails.") + driver.close() + return True - # Verbleibende Zeit anzeigen - try: - driver.execute_script(f"arguments[0].innerText='Vaccipy! - Bitte SMS-Code " - f"im Browser eingeben. Noch {max_sms_code_eingabe_sekunden} " - f"Sekunden verbleibend.'", sms_verifizierung_h1) - except Exception as e: - pass - - # Alle bisherigen Requests laden - sms_verification_responses = [] - for request in driver.requests: - if request.url == location: - sms_verification_responses.append(request.response) - - if sms_verification_responses: - - # Neuste Antowrt vom Server auslesen - # User kann z.B 2 mal den Pin falsch eingeben - # uns interessiert nur die neuste Antwort vom Server - latest_reponse = sms_verification_responses[-1] - - if latest_reponse is not None: - if latest_reponse.status_code == 400: - try: - # error Laden - error = json.loads(latest_reponse.body.decode('utf-8'))['error'] - except JSONDecodeError as exc: - raise RuntimeError(f"JSONDecodeError: {str(exc)}") from exc - - if error == "Pin ungültig": - self.log.info("Der eingegebene SMS-Code ist ungültig.") - - elif latest_reponse.status_code == 200: - self.log.info(f"SMS-Code erfolgreich übermittelt. Bitte Prüfen " - f"Sie Ihre E-Mails.") - driver.close() - return True - elif latest_reponse.status_code == 429: - driver.close() - raise RuntimeError("SMS-Code konnte nicht übermittelt werden. " - "Blockiert durch Botprotection.") - - time.sleep(1) - max_sms_code_eingabe_sekunden -= 1 + time.sleep(1) + max_sms_code_eingabe_sekunden -= 1 - driver.close() - self.log.info(f"SMS-Verifikation nicht innerhalb von 90 Sekunden abgeschlossen. " - f"Versuchen Sie es später erneut.") - return False + driver.close() + self.log.info(f"SMS-Verifikation nicht innerhalb von 90 Sekunden abgeschlossen. " + f"Versuchen Sie es später erneut.") + return False - def code_bestaetigen(self, token, cookies, sms_pin, plz_impfzentrum): + def code_bestaetigen(self, token: str, cookies: Dict, sms_pin: str, plz_impfzentrum: str) -> bool: """ Bestätigung der Code-Generierung mittels SMS-Code @@ -1304,7 +1143,6 @@ def code_bestaetigen(self, token, cookies, sms_pin, plz_impfzentrum): data = { "token": token, "smspin": sms_pin - } manual = False @@ -1325,7 +1163,7 @@ def code_bestaetigen(self, token, cookies, sms_pin, plz_impfzentrum): except RequestException as exc: self.log.error(f"Code-Verifikation fehlgeschlagen: {str(exc)}") self.log.info("Erneuter Versuch in 30 Sekunden") - time.sleep(30) + random_sleep(30) continue # Neuer Versuch in nächster Iteration if res.status_code == 429: @@ -1344,7 +1182,7 @@ def code_bestaetigen(self, token, cookies, sms_pin, plz_impfzentrum): "Code-Verifikation fehlgeschlagen: " f"{res.status_code} {res.text}") self.log.info("Erneuter Versuch in 30 Sekunden") - time.sleep(30) + random_sleep(30) continue # Neuer Versuch in nächster Iteration self.log.success( @@ -1352,7 +1190,7 @@ def code_bestaetigen(self, token, cookies, sms_pin, plz_impfzentrum): "bitte prüfe deine Mails!") return True - def impfzentrum_in_plz(self, plz_impfzentrum): + def impfzentrum_in_plz(self, plz_impfzentrum) -> Dict: for url, gruppe in self.impfzentren.items(): for iz in gruppe: if iz["PLZ"] == plz_impfzentrum: @@ -1369,7 +1207,7 @@ def notify(self, title: str, msg: str): @staticmethod def terminsuche(codes: list, plz_impfzentren: list, kontakt: dict, - PATH: str, notifications: dict = {}, zeitrahmen: dict = dict(), + PATH: str, notifications: Dict = None, zeitrahmen: Dict = None, check_delay: int = 30): """ Sucht mit mehreren Vermittlungscodes bei einer Liste von Impfzentren nach @@ -1393,6 +1231,10 @@ def terminsuche(codes: list, plz_impfzentren: list, kontakt: dict, :return: """ + if zeitrahmen is None: + zeitrahmen = {} + if notifications is None: + notifications = {} validate_codes(codes) validate_kontakt(kontakt) validate_zeitrahmen(zeitrahmen) @@ -1418,7 +1260,7 @@ def terminsuche(codes: list, plz_impfzentren: list, kontakt: dict, # Einmal Chrome starten, um früh einen Fehler zu erzeugen, falls die # erforderliche Software nicht installiert ist. - its.log.info("Teste Chromedriver") + its.log.info("Prüfen von Chromium und Chromedriver") its.get_chromedriver(headless=True).quit() for plz_impfzentrum in cycle(plz_impfzentren): @@ -1461,10 +1303,10 @@ def terminsuche(codes: list, plz_impfzentren: list, kontakt: dict, # verwenden. its.rotiere_codepoints(url) - time.sleep(check_delay) + random_sleep(check_delay) -def terminpaar_im_zeitrahmen(terminpaar, zeitrahmen): +def terminpaar_im_zeitrahmen(terminpaar, zeitrahmen) -> bool: """ Checken ob Terminpaar im angegebenen Zeitrahmen liegt @@ -1509,6 +1351,16 @@ def terminpaar_im_zeitrahmen(terminpaar, zeitrahmen): return True +def random_sleep(avg_sleeptime: float, percent_max_deviation: Optional[int] = None): + if percent_max_deviation is None: + percent_max_deviation = 10 + percent_deviation = random.randrange(-percent_max_deviation, percent_max_deviation) + random_sleeptime = avg_sleeptime * (1.0 + (percent_deviation / 100.0)) + min_sleeptime = 0 if avg_sleeptime <= 0.1 else 0.1 + sleeptime = max(min_sleeptime, random_sleeptime) + time.sleep(sleeptime) + + def get_headers(code: str): b = bytes(f':{code}', encoding='utf-8') bearer = f"Basic {b64encode(b).decode('utf-8')}" @@ -1517,3 +1369,21 @@ def get_headers(code: str): def extrahiere_impfstoffe(qualifikation: dict): return qualifikation.get("tssname", "N/A").replace(" ", "").split(",") + + +def prepare_useragent() -> str: + """ + Generiert einen zufälligen User Agent + + :return: User agent + """ + user_agents = [ + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.157 Safari/537.36', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/11.1.2 Safari/605.1.15', + 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/603.3.8 (KHTML, like Gecko) Version/10.1.2 Safari/603.3.8', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36', + ] + return choice(user_agents) diff --git a/tools/mousemover.py b/tools/mousemover.py index c08b3d9e..91ea57ee 100644 --- a/tools/mousemover.py +++ b/tools/mousemover.py @@ -1,9 +1,15 @@ import math +import random import time from random import randint +from typing import Tuple from selenium.common.exceptions import MoveTargetOutOfBoundsException from selenium.webdriver import ActionChains +from selenium.webdriver.chrome.webdriver import WebDriver +from selenium.webdriver.remote.webelement import WebElement + +from tools.clog import CLogger def move_mouse_by_offsets(x_coordinates: list, y_coordinates: list, driver) -> tuple: @@ -93,10 +99,21 @@ def generate_way_between_coordinates(source_x: int, source_y: int, target_x: int if source_y == target_y: y_target_reached = True + # Wiggle a bit + if not x_target_reached or not y_target_reached: + if x_target_reached: + source_x += random.randint(-x_min_stepwidth, x_min_stepwidth) + if y_target_reached: + source_y += random.randint(-y_min_stepwidth, y_min_stepwidth) + # Append new waypoint coordinates x_coordinates.append(source_x) y_coordinates.append(source_y) + # Don't hit exactly + x_coordinates.append(source_x + random.randint(-x_min_stepwidth, x_min_stepwidth)) + y_coordinates.append(source_y + random.randint(-y_min_stepwidth, y_min_stepwidth)) + return x_coordinates, y_coordinates @@ -122,7 +139,21 @@ def pick_next_step(source: int, target: int, max_stepwidth: int, min_stepwidth: return source + step_x -def move_mouse_to_coordinates(log, start_x: int, start_y: int, target_x: int, target_y: int, driver) -> tuple: +def move_mouse_to_element(log: CLogger, current_positon: Tuple[int, int], element: WebElement, driver: WebDriver) -> tuple: + """Move mouse from x,y coordinates to the coordinates of the element + + Args: + current_positon (tuple[int, int]): start position + element (WebElement): target element + driver (WebDriver): Chromedriver + + Returns: + tuple: Current mouse coordinates (mouse_x, mouse_y) + """ + return move_mouse_to_coordinates(log, current_positon[0], current_positon[1], element.location['x'], element.location['y'], driver) + + +def move_mouse_to_coordinates(log: CLogger, start_x: int, start_y: int, target_x: int, target_y: int, driver: WebDriver) -> tuple: """Move mouse from x,y coordinates to x,y coordinates Args: @@ -138,8 +169,6 @@ def move_mouse_to_coordinates(log, start_x: int, start_y: int, target_x: int, ta # Generate waypoints coordinates_to_element = generate_way_between_coordinates(start_x, start_y, target_x, target_y) - log.info(f"Simulation der Mausbewegungen gestartet. Von: ({start_x}, {start_y}) nach ({target_x}, {target_y})") - # Execute movements and return coordinates return move_mouse_by_offsets(coordinates_to_element[0], coordinates_to_element[1], driver) diff --git a/tools/utils.py b/tools/utils.py index 554d926f..e4e2f276 100644 --- a/tools/utils.py +++ b/tools/utils.py @@ -2,8 +2,6 @@ import time import traceback import random -import json -import sys from json import JSONDecodeError from pathlib import Path from threading import Thread