Skip to content

Commit

Permalink
feat: testing async function for faster processing
Browse files Browse the repository at this point in the history
  • Loading branch information
cka-y committed Aug 28, 2023
1 parent 7e2f7f1 commit f054663
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 95 deletions.
201 changes: 107 additions & 94 deletions infra/batch/datasets/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,51 +7,116 @@
import requests
from hashlib import md5
from sqlalchemy import create_engine, text
from aiohttp import ClientSession
import asyncio


def upload_dataset(url, bucket_name, stable_id):
async def upload_dataset(url, bucket_name, stable_id):
"""
Uploads a dataset to a GCP bucket
:param url: dataset feed's producer url
:param bucket_name: name of the GCP bucket
:param stable_id: the dataset stable id
:return: the file hash and the hosted url as a tuple
"""
# Retrieve data
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(url, stream=True, headers=headers)
content = response.content
file_md5_hash = md5(content).hexdigest()
print(f"File hash is {file_md5_hash}.")

# Create a storage client
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(f"{stable_id}/latest.zip")

upload_file = False
if blob.exists():
# Validate change
previous_dataset = blob.download_as_string()
latest_hash = md5(previous_dataset).hexdigest()
print(f"Latest hash is {latest_hash}.")
if latest_hash != file_md5_hash:
upload_file = True
else:
# Upload first version of dataset
upload_file = True

if upload_file:
# Upload file as latest
blob.upload_from_string(content)
async with ClientSession() as session:
# Retrieve data
headers = {'User-Agent': 'Mozilla/5.0'}
async with session.get(url, headers=headers) as response:
content = await response.read()
file_md5_hash = md5(content).hexdigest()
print(f"File hash is {file_md5_hash}.")

# Create a storage client
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(f"{stable_id}/latest.zip")

upload_file = False
if blob.exists():
# Validate change
previous_dataset = blob.download_as_string()
latest_hash = md5(previous_dataset).hexdigest()
print(f"Latest hash is {latest_hash}.")
if latest_hash != file_md5_hash:
upload_file = True
else:
# Upload first version of dataset
upload_file = True

if upload_file:
# Upload file as latest
blob.upload_from_string(content)

# Upload file as upload timestamp
current_time = datetime.now()
timestamp = current_time.strftime("%Y%m%d")
timestamp_blob = bucket.blob(f"{stable_id}/{timestamp}.zip")
timestamp_blob.upload_from_string(content)
return file_md5_hash, timestamp_blob.public_url
return file_md5_hash, None


async def validate_dataset_version(engine, url, bucket_name, stable_id, feed_id):
"""
Handles the validation of the dataset including the upload of the dataset to GCP
and the required database changes
:param engine: Database engine
:param url: Producer URL
:param bucket_name: GCP bucket name
:param stable_id: Feed's stable ID
:param feed_id: Feed's ID
"""
transaction = None
connection = None
errors = ""
try:
md5_file_hash, hosted_url = await upload_dataset(url, bucket_name, stable_id)

# Set up transaction for SQL updates
connection = engine.connect()
transaction = connection.begin()

# Create a new version of the dataset in the database
select_dataset_statement = text(f"select id, hash from gtfsdataset where latest=true and feed_id='{feed_id}'")
dataset_results = connection.execute(select_dataset_statement).all()
dataset_id = dataset_results[0][0] if len(dataset_results) > 0 else None
dataset_hash = dataset_results[0][1] if len(dataset_results) > 0 else None
print(f"Dataset ID = {dataset_id}, Dataset Hash = {dataset_hash}")
if dataset_id is None:
errors += f"Couldn't find latest dataset related to feed_id {feed_id}\n"
return

# Set the previous version latest field to false
if dataset_hash is not None and dataset_hash != md5_file_hash:
sql_statement = f"update gtfsdataset set latest=false where id='{dataset_id}'"
connection.execute(text(sql_statement))

# Upload file as upload timestamp
current_time = datetime.now()
timestamp = current_time.strftime("%Y%m%d%H%M%S")
timestamp_blob = bucket.blob(f"{stable_id}/{timestamp}.zip")
timestamp_blob.upload_from_string(content)
return file_md5_hash, timestamp_blob.public_url
return file_md5_hash, None
sql_statement = f"insert into gtfsdataset (id, feed_id, latest, bounding_box, note, hash, " \
f"download_date, stable_id, hosted_url) " \
f"select '{str(uuid.uuid4())}', feed_id, true, bounding_box, note, " \
f"'{md5_file_hash}', NOW(), stable_id, '{hosted_url}' from " \
f"gtfsdataset where id='{dataset_id}'"

# In case the dataset doesn't include a hash or the dataset was deleted from the bucket,
# update the existing entity
if dataset_hash is None or dataset_hash == md5_file_hash:
sql_statement = f"update gtfsdataset set hash='{md5_file_hash}' where id='{dataset_id}'"
connection.execute(text(sql_statement))

# Commit transaction after every step has run successfully
transaction.commit()
except Exception as e:
if transaction is not None:
transaction.rollback()
errors += f"[ERROR]: {e}\n"

finally:
# TODO upload logs to gcp
if len(errors) > 0:
print(f"Logging errors for stable_id = {stable_id}:\n{errors}")
if connection is not None:
connection.close()


def create_bucket(bucket_name):
Expand All @@ -66,17 +131,6 @@ def create_bucket(bucket_name):
print(f'Bucket {bucket_name} already exists.')


def create_test_file(bucket_name, file_name):
# Create a storage client
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(file_name)

# Write data to the blob
blob.upload_from_string('Changing the content of the file test')


# Register an HTTP function with the Functions Framework
@functions_framework.http
def batch_dataset(request):
bucket_name = os.getenv("BUCKET_NAME")
Expand All @@ -97,54 +151,13 @@ def batch_dataset(request):
results = engine.execute(text(sql_statement)).all()
print(f"Retrieved {len(results)} active feeds.")

errors = "" # Contains datasets that couldn't be processed
transaction = None
for result in results:
stable_id = result[0]
producer_url = result[1]
feed_id = result[2]
try:
md5_file_hash, hosted_url = upload_dataset(producer_url, bucket_name, stable_id)

# Set up transaction for SQL updates
connection = engine.connect()
transaction = connection.begin()

# Create a new version of the dataset in the database
select_dataset_statement = text(f"select id, hash from gtfsdataset where latest=true and feed_id='{feed_id}'")
dataset_results = connection.execute(select_dataset_statement).all()
dataset_id = dataset_results[0][0] if len(dataset_results) > 0 else None
dataset_hash = dataset_results[0][1] if len(dataset_results) > 0 else None
print(f"Dataset ID = {dataset_id}, Dataset Hash = {dataset_hash}")
if dataset_id is None:
errors += f"Couldn't find latest dataset related to feed_id {feed_id}\n"
continue

# Set the previous version latest field to false
if dataset_hash is not None and dataset_hash != md5_file_hash:
sql_statement = f"update gtfsdataset set latest=false where id='{dataset_id}'"
connection.execute(text(sql_statement))

sql_statement = f"insert into gtfsdataset (id, feed_id, latest, bounding_box, note, hash, " \
f"download_date, stable_id, hosted_url) " \
f"select '{str(uuid.uuid4())}', feed_id, true, bounding_box, note, " \
f"'{md5_file_hash}', NOW(), stable_id, '{hosted_url}' from " \
f"gtfsdataset where id='{dataset_id}'"

# In case the dataset doesn't include a hash or the dataset was deleted from the bucket,
# update the existing entity
if dataset_hash is None or dataset_hash == md5_file_hash:
sql_statement = f"update gtfsdataset set hash='{md5_file_hash}' where id='{dataset_id}'"
connection.execute(text(sql_statement))
loop = asyncio.get_event_loop()

tasks = [
loop.create_task(validate_dataset_version(engine, producer_url, bucket_name, stable_id, feed_id))
for stable_id, producer_url, feed_id in results
]

# Commit transaction after every step has run successfully
transaction.commit()
loop.run_until_complete(asyncio.gather(*tasks))

except Exception as e:
if transaction is not None:
transaction.rollback()
errors += f"[ERROR] While updating dataset with stable_id = {stable_id}\n{e}\n"
continue
# TODO upload logs to gcp
print(f"Logging errors:\n{errors}")
return 'Completed datasets batch processing.'
4 changes: 3 additions & 1 deletion infra/batch/datasets/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
functions-framework==3.*
google-cloud-storage
SQLAlchemy==1.4.49
psycopg2-binary==2.9.6
psycopg2-binary==2.9.6
aiohttp
asyncio

0 comments on commit f054663

Please sign in to comment.