Skip to content

Commit

Permalink
Bdc/feature/73 lookup email accounts google (#80)
Browse files Browse the repository at this point in the history
* Further improved the email preprocessing by analyzing the account names in addition to custom domains

Signed-off-by: Lucca Baumgärtner <[email protected]>

* Use email account info for google search query

Signed-off-by: Lucca Baumgärtner <[email protected]>

* add some comments

Signed-off-by: Lucca Baumgärtner <[email protected]>

* fix regex and search_query being None

Signed-off-by: Lucca Baumgärtner <[email protected]>

* query some more fields

Signed-off-by: Lucca Baumgärtner <[email protected]>

---------

Signed-off-by: Lucca Baumgärtner <[email protected]>
  • Loading branch information
luccalb authored Nov 21, 2023
1 parent 4c8230b commit d13ee06
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 100 deletions.
2 changes: 1 addition & 1 deletion src/bdc/steps/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: 2023 Lucca Baumgärtner <[email protected]>

from .enrich_custom_domains import EnrichCustomDomains
from .analyze_emails import AnalyzeEmails
from .google_places import GooglePlaces
from .preprocess_phonenumbers import PreprocessPhonenumbers
from .scrape_address import ScrapeAddress
116 changes: 116 additions & 0 deletions src/bdc/steps/analyze_emails.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: 2023 Lucca Baumgärtner <[email protected]>

import pandas as pd
from email_validator import EmailNotValidError, validate_email

from bdc.steps.step import Step


def extract_custom_domain(email: str) -> pd.Series:
try:
validate_email(email, check_deliverability=False)
return pd.Series([email.split("@")[1], True])
except EmailNotValidError as e:
return pd.Series([None, False])


def analyze_email_account(lead) -> pd.Series:
if not lead["email_valid"]:
return pd.Series([False, False])
email_account = lead["Email"].split("@")[0]
first_name_in_account = (
lead["First Name"].lower() in email_account.lower()
if "First Name" in lead
else False
)
last_name_in_account = (
lead["Last Name"].lower() in email_account.lower()
if "Last Name" in lead
else False
)
return pd.Series([first_name_in_account, last_name_in_account])


class AnalyzeEmails(Step):
"""
A pipeline step performing various preprocessing steps with the given email address.
The following columns will be added on successful processing:
- **domain**: The custom domain name/website if any
- **email_valid**: Boolean result of email check
- **first_name_in_account**: Boolean, True if the given first name is part of the email account name
- **last_name_in_account**: Boolean, True if the given last name is part of the email account name
"""

name = "Analyze-Emails"

def load_data(self):
pass

def verify(self):
return (
self._df is not None
and "Email" in self._df
and "First Name" in self._df
and "Last Name" in self._df
)

def run(self):
commercial_domains = [
"web.de",
"mail.com",
"mail.de",
"msn.com",
"gmail.com",
"yahoo.com",
"hotmail.com",
"aol.com",
"hotmail.co.uk",
"hotmail.fr",
"yahoo.fr",
"live.com",
"gmx.de",
"outlook.com",
"icloud.com",
"outlook.de",
"online.de",
"gmx.net",
"googlemail.com",
"yahoo.de",
"t-online.de",
"gmx.ch",
"gmx.at",
"hotmail.ch",
"live.nl",
"hotmail.de",
"home.nl",
"bluewin.ch",
"freenet.de",
"upcmail.nl",
"zeelandnet.nl",
"hotmail.nl",
"arcor.de",
"aol.de",
"me.com",
"gmail.con",
"office.de",
"my.com",
]
# extract domain from email
# Possibly add the normalized email here
self._df[["domain", "email_valid"]] = self._df.apply(
lambda lead: extract_custom_domain(str(lead["Email"])), axis=1
)

self._df[["first_name_in_account", "last_name_in_account"]] = self._df.apply(
lambda lead: analyze_email_account(lead), axis=1
)

# remove commercial domains
self._df["domain"].replace(commercial_domains, None, inplace=True)
return self.df

def finish(self):
p_custom_domains = self._df["domain"].notna().sum() / len(self._df) * 100
self.log(f"Percentage of custom domains: {p_custom_domains:.2f}%")
81 changes: 0 additions & 81 deletions src/bdc/steps/enrich_custom_domains.py

This file was deleted.

61 changes: 52 additions & 9 deletions src/bdc/steps/google_places.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# SPDX-FileCopyrightText: 2023 Ruchita Nathani <[email protected]>
# SPDX-FileCopyrightText: 2023 Ahmed Sheta <[email protected]>

import re
from http import HTTPStatus

import googlemaps
Expand All @@ -20,7 +21,29 @@
class GooglePlaces(Step):
name = "Google_Places"
URL = "https://maps.googleapis.com/maps/api/place/textsearch/json?query="
fields = ["business_status", "formatted_address", "name", "user_ratings_total"]

# fields that are expected as an output of the df.apply lambda function
df_fields = [
"place_id",
"business_status",
"formatted_address",
"name",
"user_ratings_total",
"rating",
"price_level",
"no_candidates",
]
# fields that are accessed directly from the api
api_fields = [
"place_id",
"business_status",
"formatted_address",
"name",
"user_ratings_total",
"rating",
"price_level",
]

gmaps = None

def load_data(self) -> None:
Expand All @@ -34,13 +57,15 @@ def verify(self) -> bool:
self.df is not None
and "Email" in self.df
and "domain" in self.df
and "first_name_in_account" in self.df
and "last_name_in_account" in self.df
and GOOGLE_PLACES_API_KEY is not None
)

def run(self) -> None:
def run(self) -> pd.DataFrame:
tqdm.pandas(desc="Getting info from Places API")
self.df[
[f"{self.name.lower()}_{field}" for field in self.fields]
[f"{self.name.lower()}_{field}" for field in self.df_fields]
] = self.df.progress_apply(
lambda lead: self.get_data_from_google_api(lead), axis=1
)
Expand All @@ -51,15 +76,27 @@ def finish(self) -> None:

def get_data_from_google_api(self, lead_row):
"""Request Google Places Text Search API"""
error_return_value = pd.Series([None] * len(self.fields))
error_return_value = pd.Series([None] * len(self.df_fields))

search_query = lead_row["domain"]

# Go through each email address entry and remove the domain name (can do this in preprocessing, this is for test)
domain = lead_row["domain"]
if domain is None:
if search_query is None and lead_row["email_valid"]:
account_name = lead_row["Email"].split("@")[0]
if not (
lead_row["first_name_in_account"] and lead_row["last_name_in_account"]
):
# use account name as search query and replace special characters with whitespace
search_query = re.sub(r"[^a-zA-Z0-9\n]", " ", account_name)

if search_query is None:
# if account name consists only of first and last name and no custom domain is available,
# skip the search as no results are expected
return error_return_value

try:
response = self.gmaps.find_place(domain, "textquery", fields=self.fields)
response = self.gmaps.find_place(
search_query, "textquery", fields=self.api_fields
)
# Retrieve response
# response = requests.get(self.URL + domain + "&key=" + GOOGLE_PLACES_API_KEY)
except RequestException as e:
Expand All @@ -76,8 +113,14 @@ def get_data_from_google_api(self, lead_row):
# Only look at the top result TODO: Check if we can cross check available values to rate results
top_result = response["candidates"][0]

no_candidates = len(response["candidates"])

results_list = [
top_result[field] if field in top_result else None for field in self.fields
top_result[field] if field in top_result else None
for field in self.api_fields
]

# add number of candidates, which is not a direct field in the api response but can be derived from it
results_list.append(no_candidates)

return pd.Series(results_list)
14 changes: 5 additions & 9 deletions src/demos.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@

from bdc import DataCollector
from bdc.pipeline import Pipeline
from bdc.steps import (
EnrichCustomDomains,
GooglePlaces,
PreprocessPhonenumbers,
ScrapeAddress,
)
from bdc.steps import AnalyzeEmails, GooglePlaces, PreprocessPhonenumbers, ScrapeAddress
from bdc.steps.step import Step
from database import get_database
from evp import EstimatedValuePredictor

Expand Down Expand Up @@ -82,9 +78,9 @@ def db_demo():


def pipeline_demo():
steps = [EnrichCustomDomains()]
input_location = "src/data/sumup_leads_email.csv"
output_location = "src/data/leads_enriched.csv"
steps: list[Step] = [AnalyzeEmails()]
input_location = "./data/sumup_leads_email.csv"
output_location = "./data/leads_enriched.csv"
try:
choice = str(input(f"Run Scrape Address step? (will take a long time) (y/N)\n"))
if choice == "y" or choice == "Y":
Expand Down

0 comments on commit d13ee06

Please sign in to comment.