Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Google API call in the background #5

Merged
merged 3 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ apscheduler>=3.10.4,<4
google-api-python-client>=2.141.0,<3
google-auth-httplib2>=0.2.0,<1
google-auth-oauthlib>=1.2.1,<2
python-slugify>=8.0.4,<9
113 changes: 78 additions & 35 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
from email.mime.text import MIMEText
from html.parser import HTMLParser
from smtplib import SMTP, SMTPNotSupportedError
from textwrap import dedent

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from azure.core.exceptions import ResourceNotFoundError
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
from watcloud_utils.fastapi import WATcloudFastAPI
from watcloud_utils.logging import logger, set_up_logging

from google_admin_sdk_utils import DirectoryService
from utils import get_azure_table_client, random_str
from utils import get_azure_table_client, random_str, make_azure_table_key


class HTMLTextFilter(HTMLParser):
Expand All @@ -34,15 +34,20 @@ def handle_data(self, data):

@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.add_job(cleanup, trigger=CronTrigger.from_crontab("* * * * *"))
scheduler.add_job(clean_up, trigger=CronTrigger.from_crontab("* * * * *"))
scheduler.add_job(commit, trigger=CronTrigger.from_crontab("* * * * *"))
yield
scheduler.shutdown()


def healthcheck(app: WATcloudFastAPI):
cleanup_delay_threshold = 120
if time.time() - app.runtime_info["last_cleanup_time"] > cleanup_delay_threshold:
msg = f"Last cleanup was more than {cleanup_delay_threshold} seconds ago."
healthcheck_threshold_sec = 120
if time.time() - app.runtime_info["last_cleanup_time"] > healthcheck_threshold_sec:
msg = f"Last cleanup was more than {healthcheck_threshold_sec} seconds ago."
logger.error(msg)
raise HTTPException(status_code=500, detail=msg)
if time.time() - app.runtime_info["last_commit_time"] > healthcheck_threshold_sec:
msg = f"Last commit was more than {healthcheck_threshold_sec} seconds ago."
logger.error(msg)
raise HTTPException(status_code=500, detail=msg)

Expand All @@ -60,7 +65,9 @@ def healthcheck(app: WATcloudFastAPI):
"num_successful_confirms": 0,
"num_failed_confirms": 0,
"num_expired_signups": 0,
"num_successful_commits": 0,
"last_cleanup_time": time.time(),
"last_commit_time": time.time(),
},
health_fns=[healthcheck],
)
Expand All @@ -71,7 +78,7 @@ class SignUpRequest(BaseModel):
email: str


CODE_TTL_SEC = 15 * 60
CODE_TTL_SEC = 60 * 60 * 24


@app.post("/sign-up")
Expand All @@ -84,14 +91,16 @@ def sign_up(req: SignUpRequest, request: Request):
raise HTTPException(status_code=400, detail="Invalid mailing list")

# Generate a random code
code = random_str(10)
code = random_str(32)

table_client.upsert_entity(
entity={
"PartitionKey": req.mailing_list,
"RowKey": req.email,
"PartitionKey": make_azure_table_key([req.mailing_list]),
"RowKey": make_azure_table_key([req.email, code]),
"CreatedAt": time.time(),
"Code": code,
"ConfirmedAt": 0,
"MailingList": req.mailing_list,
"Email": req.email,
}
)

Expand All @@ -112,7 +121,7 @@ def sign_up(req: SignUpRequest, request: Request):
<body>
<h1>Confirm Your Subscription</h1>
<p>Thanks for signing up for updates from "{req.mailing_list}"!</p>
<p>Please confirm your subscription by clicking the button below. This confirmation email will expire in {CODE_TTL_SEC // 60} minutes.</p>
<p>Please confirm your subscription by clicking the button below. This confirmation email will expire in {int(CODE_TTL_SEC / 60 / 60)} hours.</p>
<a class="confirmation-button" href="{confirmation_url}">Confirm Email</a>
<p>If the button above does not work, please copy and paste the following URL into your browser:</p>
<pre class="monospace-text">{confirmation_url}</pre>
Expand Down Expand Up @@ -178,43 +187,41 @@ def sign_up(req: SignUpRequest, request: Request):

@app.get("/confirm/{mailing_list}/{email}/{code}")
def confirm(mailing_list: str, email: str, code: str):
from azure.core.exceptions import ResourceNotFoundError

"""
Confirm the subscription and schedule the addition to the mailing list.
We schedule the addition instead of adding it immediately to minimize the room
for error in this handler (e.g., network issues when adding to the mailing list).
"""
try:
entity = table_client.get_entity(partition_key=mailing_list, row_key=email)
# update_entity merges the new entity with the existing entity, and throws
# ResourceNotFoundError if the entity does not exist.
table_client.update_entity(
entity={
"PartitionKey": make_azure_table_key([mailing_list]),
"RowKey": make_azure_table_key([email, code]),
"ConfirmedAt": time.time(),
}
)
except ResourceNotFoundError:
app.runtime_info["num_failed_confirms"] += 1
raise HTTPException(status_code=400, detail="Code expired or invalid")

if entity["Code"] != code or time.time() - entity["CreatedAt"] > CODE_TTL_SEC:
app.runtime_info["num_failed_confirms"] += 1
raise HTTPException(status_code=400, detail="Code expired or invalid")

if not directory_service.is_whitelisted_group(mailing_list):
raise HTTPException(
status_code=500, detail="Invalid mailing list found in the database"
)

directory_service.insert_member(mailing_list, email)

# delete the entity
table_client.delete_entity(partition_key=mailing_list, row_key=email)
raise HTTPException(status_code=400, detail="Link expired or invalid. Please sign up again.")

app.runtime_info["num_successful_confirms"] += 1

return {
"status": "ok",
"message": f"Subscription confirmed! '{email}' has been added to the '{mailing_list}' mailing list.",
"message": f"Subscription confirmed! Details: {mailing_list=}, {email=}",
}


@app.post("/cleanup")
def cleanup():
@app.post("/clean-up")
def clean_up():
"""
Clean up expired signups.
"""
# find unconfirmed signups that are older than CODE_TTL_SEC
expired_entities = table_client.query_entities(
query_filter=f"CreatedAt lt @ExpiryTime",
query_filter=f"ConfirmedAt eq 0 and CreatedAt lt @ExpiryTime",
select=["PartitionKey", "RowKey"],
parameters={"ExpiryTime": time.time() - CODE_TTL_SEC},
headers={"Accept": "application/json;odata=nometadata"},
Expand All @@ -228,6 +235,42 @@ def cleanup():

app.runtime_info["num_expired_signups"] += deleted_count
app.runtime_info["last_cleanup_time"] = time.time()
msg = f"cleanup: Deleted {deleted_count} expired signup(s)."
msg = f"clean_up: Deleted {deleted_count} expired signup(s)."
logger.info(msg)
return {"status": "ok", "message": msg}

@app.post("/commit")
def commit():
"""
Add confirmed signups to the mailing list.
Adding to the mailing list is idempotent, so we can safely retry this operation.
"""
confirmed_entities = table_client.query_entities(
query_filter="ConfirmedAt gt 0",
select=["PartitionKey", "RowKey", "MailingList", "Email"],
headers={"Accept": "application/json;odata=nometadata"},
)

commit_count = 0
for entity in confirmed_entities:
mailing_list = entity["MailingList"]
email = entity["Email"]

# Sanity check to ensure the mailing list is valid
if not directory_service.is_whitelisted_group(mailing_list):
raise HTTPException(
status_code=500, detail="Invalid mailing list found in the database"
)

directory_service.insert_member(mailing_list, email)

table_client.delete_entity(partition_key=entity["PartitionKey"], row_key=entity["RowKey"])

commit_count += 1

app.runtime_info["num_successful_commits"] += commit_count
app.runtime_info["last_commit_time"] = time.time()

msg = f"commit: Committed {commit_count} confirmed signup(s) to the mailing list."
logger.info(msg)
return {"status": "ok", "message": msg}
33 changes: 29 additions & 4 deletions src/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import os
import random
import string

from slugify import slugify
from watcloud_utils.logging import logger
from watcloud_utils.typer import app

Expand Down Expand Up @@ -42,14 +45,36 @@ def delete_azure_table(table_name: str):


@app.command()
def random_str(length: int = 10):
def random_str(length: int = 32, chars: str = string.ascii_lowercase):
"""
Generate a random string of the given length.

The default dictionary of characters to choose from is the lowercase alphabet.
"""
return "".join(random.choices(chars, k=length))

@app.command()
def make_azure_table_key(strs: list[str]):
r"""
Generate an Azure Table key from the given strings.

The generated key conforms to the following requirements:
- (azure) up to 1024 characters
- (azure) does not contain the characters '/', '\', '#', '?', or control characters
- (custom) the beginning of each str is guaranteed to be included in the key
- (custom) the generated key is deterministic for the given input

Requirements derived from:
- https://learn.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model
"""
import random
import string
# Just a naive implementation for now
max_len_per_str = 1024 // len(strs)

key = "".join(slugify(s)[:max_len_per_str] for s in strs)

return key


return "".join(random.choices(string.ascii_letters, k=length))


if __name__ == "__main__":
Expand Down
Loading