Skip to content

Commit

Permalink
fix: memory utilization
Browse files Browse the repository at this point in the history
  • Loading branch information
cka-y committed Aug 29, 2023
1 parent 6294f92 commit 7af0ea5
Showing 1 changed file with 26 additions and 10 deletions.
36 changes: 26 additions & 10 deletions infra/batch/datasets/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import requests
from hashlib import md5
from sqlalchemy import create_engine, text
from aiohttp import ClientSession
from aiohttp import ClientSession, TCPConnector
import asyncio


Expand Down Expand Up @@ -35,8 +35,10 @@ async def upload_dataset(url, bucket_name, stable_id):
upload_file = False
if blob.exists():
# Validate change
previous_dataset = blob.download_as_string()
latest_hash = md5(previous_dataset).hexdigest()
latest_hash = md5()
stream = blob.open("rb")
for chunk in stream.iter_content(chunk_size=4096):
latest_hash.update(chunk)
print(f"Latest hash is {latest_hash}.")
if latest_hash != file_md5_hash:
upload_file = True
Expand All @@ -57,6 +59,22 @@ async def upload_dataset(url, bucket_name, stable_id):
return file_md5_hash, None


async def process_all(engine, bucket_name, results):
"""
TODO
:param engine:
:param bucket_name:
:param results:
"""
connector = TCPConnector(limit_per_host=50) # Limit the pool size
async with ClientSession(connector=connector) as session:
tasks = [
validate_dataset_version(engine, producer_url, bucket_name, stable_id, feed_id)
for stable_id, producer_url, feed_id in results
]
await asyncio.gather(*tasks)


async def validate_dataset_version(engine, url, bucket_name, stable_id, feed_id):
"""
Handles the validation of the dataset including the upload of the dataset to GCP
Expand Down Expand Up @@ -123,6 +141,10 @@ async def validate_dataset_version(engine, url, bucket_name, stable_id, feed_id)


def create_bucket(bucket_name):
"""
TODO
:param bucket_name:
"""
storage_client = storage.Client()
# Check if the bucket already exists
bucket = storage_client.lookup_bucket(bucket_name)
Expand Down Expand Up @@ -156,12 +178,6 @@ def batch_dataset(request):

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

tasks = [
loop.create_task(validate_dataset_version(engine, producer_url, bucket_name, stable_id, feed_id))
for stable_id, producer_url, feed_id in results
]

loop.run_until_complete(asyncio.gather(*tasks))
loop.run_until_complete(process_all())

return 'Completed datasets batch processing.'

0 comments on commit 7af0ea5

Please sign in to comment.