diff --git a/src/bdc/steps/__init__.py b/src/bdc/steps/__init__.py index b7353d3..4460f76 100644 --- a/src/bdc/steps/__init__.py +++ b/src/bdc/steps/__init__.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT # SPDX-FileCopyrightText: 2023 Lucca Baumgärtner -from .enrich_custom_domains import EnrichCustomDomains +from .analyze_emails import AnalyzeEmails from .google_places import GooglePlaces from .preprocess_phonenumbers import PreprocessPhonenumbers from .scrape_address import ScrapeAddress diff --git a/src/bdc/steps/analyze_emails.py b/src/bdc/steps/analyze_emails.py new file mode 100644 index 0000000..5b4e551 --- /dev/null +++ b/src/bdc/steps/analyze_emails.py @@ -0,0 +1,116 @@ +# SPDX-License-Identifier: MIT +# SPDX-FileCopyrightText: 2023 Lucca Baumgärtner + +import pandas as pd +from email_validator import EmailNotValidError, validate_email + +from bdc.steps.step import Step + + +def extract_custom_domain(email: str) -> pd.Series: + try: + validate_email(email, check_deliverability=False) + return pd.Series([email.split("@")[1], True]) + except EmailNotValidError as e: + return pd.Series([None, False]) + + +def analyze_email_account(lead) -> pd.Series: + if not lead["email_valid"]: + return pd.Series([False, False]) + email_account = lead["Email"].split("@")[0] + first_name_in_account = ( + lead["First Name"].lower() in email_account.lower() + if "First Name" in lead + else False + ) + last_name_in_account = ( + lead["Last Name"].lower() in email_account.lower() + if "Last Name" in lead + else False + ) + return pd.Series([first_name_in_account, last_name_in_account]) + + +class AnalyzeEmails(Step): + """ + A pipeline step performing various preprocessing steps with the given email address. + The following columns will be added on successful processing: + + - **domain**: The custom domain name/website if any + - **email_valid**: Boolean result of email check + - **first_name_in_account**: Boolean, True if the given first name is part of the email account name + - **last_name_in_account**: Boolean, True if the given last name is part of the email account name + """ + + name = "Analyze-Emails" + + def load_data(self): + pass + + def verify(self): + return ( + self._df is not None + and "Email" in self._df + and "First Name" in self._df + and "Last Name" in self._df + ) + + def run(self): + commercial_domains = [ + "web.de", + "mail.com", + "mail.de", + "msn.com", + "gmail.com", + "yahoo.com", + "hotmail.com", + "aol.com", + "hotmail.co.uk", + "hotmail.fr", + "yahoo.fr", + "live.com", + "gmx.de", + "outlook.com", + "icloud.com", + "outlook.de", + "online.de", + "gmx.net", + "googlemail.com", + "yahoo.de", + "t-online.de", + "gmx.ch", + "gmx.at", + "hotmail.ch", + "live.nl", + "hotmail.de", + "home.nl", + "bluewin.ch", + "freenet.de", + "upcmail.nl", + "zeelandnet.nl", + "hotmail.nl", + "arcor.de", + "aol.de", + "me.com", + "gmail.con", + "office.de", + "my.com", + ] + # extract domain from email + # Possibly add the normalized email here + self._df[["domain", "email_valid"]] = self._df.apply( + lambda lead: extract_custom_domain(str(lead["Email"])), axis=1 + ) + + self._df[["first_name_in_account", "last_name_in_account"]] = self._df.apply( + lambda lead: analyze_email_account(lead), axis=1 + ) + + # remove commercial domains + self._df["domain"].replace(commercial_domains, None, inplace=True) + return self.df + + def finish(self): + p_custom_domains = self._df["domain"].notna().sum() / len(self._df) * 100 + self.log(f"Percentage of custom domains: {p_custom_domains:.2f}%") diff --git a/src/bdc/steps/enrich_custom_domains.py b/src/bdc/steps/enrich_custom_domains.py deleted file mode 100644 index 43b9644..0000000 --- a/src/bdc/steps/enrich_custom_domains.py +++ /dev/null @@ -1,81 +0,0 @@ -# SPDX-License-Identifier: MIT -# SPDX-FileCopyrightText: 2023 Lucca Baumgärtner - -from typing import Optional - -import pandas as pd -from email_validator import EmailNotValidError, validate_email - -from bdc.steps.step import Step - - -class EnrichCustomDomains(Step): - name = "Custom-Domains" - - def load_data(self): - pass - - def verify(self): - return "Email" in self._df - - def run(self): - commercial_domains = [ - "web.de", - "mail.com", - "mail.de", - "msn.com", - "gmail.com", - "yahoo.com", - "hotmail.com", - "aol.com", - "hotmail.co.uk", - "hotmail.fr", - "yahoo.fr", - "live.com", - "gmx.de", - "outlook.com", - "icloud.com", - "outlook.de", - "online.de", - "gmx.net", - "googlemail.com", - "yahoo.de", - "t-online.de", - "gmx.ch", - "gmx.at", - "hotmail.ch", - "live.nl", - "hotmail.de", - "home.nl", - "bluewin.ch", - "freenet.de", - "upcmail.nl", - "zeelandnet.nl", - "hotmail.nl", - "arcor.de", - "aol.de", - "me.com", - "gmail.con", - ] - # extract domain from email - # Possibly add the normalized email here - self._df["domain"] = self._df.apply( - lambda row: self.check_valid_email(str(row["Email"])), axis=1 - ) - - # remove commercial domains - self._df["domain"].replace(commercial_domains, None, inplace=True) - return self.df - - def finish(self): - # print(self.df.head()) - p_custom_domains = self._df["domain"].notna().sum() / len(self._df) * 100 - self.log(f"Percentage of custom domains: {p_custom_domains:.2f}%") - # print(self._df["domain"].value_counts(sort=True)) - - def check_valid_email(self, email: str) -> Optional[str]: - try: - validate_email(email, check_deliverability=False) - return email.split("@")[1] - except EmailNotValidError as e: - return None diff --git a/src/bdc/steps/google_places.py b/src/bdc/steps/google_places.py index 4624a8f..43bd7ff 100644 --- a/src/bdc/steps/google_places.py +++ b/src/bdc/steps/google_places.py @@ -6,6 +6,7 @@ # SPDX-FileCopyrightText: 2023 Ruchita Nathani # SPDX-FileCopyrightText: 2023 Ahmed Sheta +import re from http import HTTPStatus import googlemaps @@ -20,7 +21,29 @@ class GooglePlaces(Step): name = "Google_Places" URL = "https://maps.googleapis.com/maps/api/place/textsearch/json?query=" - fields = ["business_status", "formatted_address", "name", "user_ratings_total"] + + # fields that are expected as an output of the df.apply lambda function + df_fields = [ + "place_id", + "business_status", + "formatted_address", + "name", + "user_ratings_total", + "rating", + "price_level", + "no_candidates", + ] + # fields that are accessed directly from the api + api_fields = [ + "place_id", + "business_status", + "formatted_address", + "name", + "user_ratings_total", + "rating", + "price_level", + ] + gmaps = None def load_data(self) -> None: @@ -34,13 +57,15 @@ def verify(self) -> bool: self.df is not None and "Email" in self.df and "domain" in self.df + and "first_name_in_account" in self.df + and "last_name_in_account" in self.df and GOOGLE_PLACES_API_KEY is not None ) - def run(self) -> None: + def run(self) -> pd.DataFrame: tqdm.pandas(desc="Getting info from Places API") self.df[ - [f"{self.name.lower()}_{field}" for field in self.fields] + [f"{self.name.lower()}_{field}" for field in self.df_fields] ] = self.df.progress_apply( lambda lead: self.get_data_from_google_api(lead), axis=1 ) @@ -51,15 +76,27 @@ def finish(self) -> None: def get_data_from_google_api(self, lead_row): """Request Google Places Text Search API""" - error_return_value = pd.Series([None] * len(self.fields)) + error_return_value = pd.Series([None] * len(self.df_fields)) + + search_query = lead_row["domain"] - # Go through each email address entry and remove the domain name (can do this in preprocessing, this is for test) - domain = lead_row["domain"] - if domain is None: + if search_query is None and lead_row["email_valid"]: + account_name = lead_row["Email"].split("@")[0] + if not ( + lead_row["first_name_in_account"] and lead_row["last_name_in_account"] + ): + # use account name as search query and replace special characters with whitespace + search_query = re.sub(r"[^a-zA-Z0-9\n]", " ", account_name) + + if search_query is None: + # if account name consists only of first and last name and no custom domain is available, + # skip the search as no results are expected return error_return_value try: - response = self.gmaps.find_place(domain, "textquery", fields=self.fields) + response = self.gmaps.find_place( + search_query, "textquery", fields=self.api_fields + ) # Retrieve response # response = requests.get(self.URL + domain + "&key=" + GOOGLE_PLACES_API_KEY) except RequestException as e: @@ -76,8 +113,14 @@ def get_data_from_google_api(self, lead_row): # Only look at the top result TODO: Check if we can cross check available values to rate results top_result = response["candidates"][0] + no_candidates = len(response["candidates"]) + results_list = [ - top_result[field] if field in top_result else None for field in self.fields + top_result[field] if field in top_result else None + for field in self.api_fields ] + # add number of candidates, which is not a direct field in the api response but can be derived from it + results_list.append(no_candidates) + return pd.Series(results_list) diff --git a/src/demos.py b/src/demos.py index 23bc9de..755e0dd 100644 --- a/src/demos.py +++ b/src/demos.py @@ -8,12 +8,8 @@ from bdc import DataCollector from bdc.pipeline import Pipeline -from bdc.steps import ( - EnrichCustomDomains, - GooglePlaces, - PreprocessPhonenumbers, - ScrapeAddress, -) +from bdc.steps import AnalyzeEmails, GooglePlaces, PreprocessPhonenumbers, ScrapeAddress +from bdc.steps.step import Step from database import get_database from evp import EstimatedValuePredictor @@ -82,9 +78,9 @@ def db_demo(): def pipeline_demo(): - steps = [EnrichCustomDomains()] - input_location = "src/data/sumup_leads_email.csv" - output_location = "src/data/leads_enriched.csv" + steps: list[Step] = [AnalyzeEmails()] + input_location = "./data/sumup_leads_email.csv" + output_location = "./data/leads_enriched.csv" try: choice = str(input(f"Run Scrape Address step? (will take a long time) (y/N)\n")) if choice == "y" or choice == "Y":