Skip to content

Commit

Permalink
Merge pull request #27 from chihacknight/update-backfill-aggregations
Browse files Browse the repository at this point in the history
backfill data 2022-08-08 through 2022-10-08 into public bucket
  • Loading branch information
lauriemerrell authored Oct 17, 2022
2 parents d3fa963 + 71eb59c commit 474d9ee
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 141 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.DS_Store
__pycache__/
.ipynb_checkpoints/
venv
.venv
Expand Down
Empty file added data_analysis/__init__.py
Empty file.
3 changes: 2 additions & 1 deletion data_analysis/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ pandas==1.4.3
geopandas==0.11.1
s3fs==2022.7.1
shapely==1.8.4
jupyter==1.0.0
jupyter==1.0.0
typer[all]==0.6.1
144 changes: 20 additions & 124 deletions data_analysis/rt_daily_aggregations.py
Original file line number Diff line number Diff line change
@@ -1,149 +1,45 @@
import os
from typing import Tuple
import logging

import boto3
import json
import pandas as pd
import pendulum
from tqdm import tqdm
import typer
from dotenv import load_dotenv
# have to run from root directory for this to work
from scrape_data import combine_daily_files

load_dotenv()
app = typer.Typer()

BUCKET_PUBLIC = os.getenv('BUCKET_PUBLIC', 'chn-ghost-buses-public')
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO)


def compute_hourly_totals(
start_data: str, end_date: str, bucket_type: str = None
) -> Tuple[pd.DataFrame]:
"""Aggregate route data to hourly totals.
Args:
start_data (str): starting date of the route
end_date (str): ending date of route
bucket_type (str, optional): which bucket to pull data from. If
'public', BUCKET_PUBLIC is used. If 'private', BUCKET_PRIVATE.
Defaults to None.
Returns:
Tuple[pd.DataFrame]: a tuple of the data, errors, and combined
hourly summaries DataFrames.
@app.command()
def combine_daily_totals(
start_date: str = typer.Argument(..., help = "Start date in format YYYY-MM-DD. Acceptable dates are any date between 2022-05-19 and yesterday (so, if today is 2022-10-11, can input any date up to 2022-10-10. Must be before end date."),
end_date: str = typer.Argument(..., help = "End date in format YYYY-MM-DD. Acceptable dates are any date between 2022-05-19 and yesterday (so, if today is 2022-10-11, can input any date up to 2022-10-10. Must be after start date."),
bucket_type: str = typer.Argument(..., help = "Either 'public' or 'private' -- bucket to read data from. User running must have read access to the bucket in question, so general users should list 'public'."),
save: str = typer.Argument(..., help = "Optional string 'bucket' or 'local' indicating whether you want to save your combined files in the bucket they're being read from or locally. If empty, files will not be saved.")
):
"""
hourly_summary_combined = pd.DataFrame()
Take all the raw JSON files for each date in a date range from one of the CHN ghost buses S3 buckets (private or public)
and aggregate them into daily CSV files.
TODO: Add save parameter so these can be saved locally rather than just back to the bucket.
"""

date_range = [
d
for d in pendulum.period(
pendulum.from_format(start_date, "YYYY-MM-DD"),
pendulum.from_format(end_date, "YYYY-MM-DD"),
).range("days")
]
s3 = boto3.resource("s3")
if bucket_type is None or bucket_type == "public":
bucket = s3.Bucket(BUCKET_PUBLIC)
else:
bucket = s3.Bucket(bucket_type)

pbar = tqdm(date_range)
for day in pbar:
date_str = day.to_date_string()
pbar.set_description(
f"Processing {date_str} at {pendulum.now().to_datetime_string()}")
objects = bucket.objects.filter(Prefix=f"bus_data/{date_str}")

logger.info(f"------- loading data at"
f"{pendulum.now().to_datetime_string()}")

# load data
data_dict = {}

# Access denied for public bucket
obj_pbar = tqdm(objects)
for obj in obj_pbar:
obj_pbar.set_description(f"loading {obj}")
obj_name = obj.key
# https://stackoverflow.com/questions/31976273/open-s3-object-as-a-string-with-boto3
obj_body = json.loads(obj.get()["Body"].read().decode("utf-8"))
data_dict[obj_name] = obj_body

# parse data into actual vehicle locations and errors

logger.info(f"------- parsing data at"
f"{pendulum.now().to_datetime_string()}")

data = pd.DataFrame()
errors = pd.DataFrame()

# k, v here are filename: full dict of JSON
data_dict_pbar = tqdm(data_dict.items())
for k, v in data_dict_pbar:
data_dict_pbar.set_description(f"processing {k}")
filename = k
new_data = pd.DataFrame()
new_errors = pd.DataFrame()
# expect ~12 "chunks" per JSON
for chunk, contents in v.items():
if "vehicle" in v[chunk]["bustime-response"].keys():
new_data = new_data.append(
pd.DataFrame(v[chunk]["bustime-response"]["vehicle"])
)
if "error" in v[chunk]["bustime-response"].keys():
new_errors = new_errors.append(
pd.DataFrame(v[chunk]["bustime-response"]["error"])
)
new_data["scrape_file"] = filename
new_errors["scrape_file"] = filename
data = data.append(new_data)
errors = errors.append(new_errors)

logger.info(f"------- saving data at"
f"{pendulum.now().to_datetime_string()}")

if len(errors) > 0:
bucket.put_object(
Body=errors.to_csv(index=False),
Key=f"bus_full_day_errors_v2/{date_str}.csv",
)

if len(data) > 0:
# convert data time to actual datetime
data["data_time"] = pd.to_datetime(
data["tmstmp"], format="%Y%m%d %H:%M")

data["data_hour"] = data.data_time.dt.hour
data["data_date"] = data.data_time.dt.date

bucket.put_object(
Body=data.to_csv(index=False),
Key=f"bus_full_day_data_v2/{date_str}.csv",
)

# combine vids into a set (drops duplicates):
# https://stackoverflow.com/a/45925961
hourly_summary = (
data.groupby(["data_date", "data_hour", "rt", "des"])
.agg({"vid": set, "tatripid": set, "tablockid": set})
.reset_index()
)
# get number of vehicles per hour per route
hourly_summary["vh_count"] = hourly_summary["vid"].apply(len)
hourly_summary["trip_count"] = hourly_summary["tatripid"].apply(
len)
hourly_summary["block_count"] = hourly_summary["tablockid"].apply(
len)

bucket.put_object(
Body=hourly_summary.to_csv(index=False),
Key=f"bus_hourly_summary_v2/{date_str}.csv",
)
pd.concat([hourly_summary_combined, hourly_summary])
return data, errors, hourly_summary

data, errors = combine_daily_files.combine_daily_files(day.to_date_string(), [f'chn-ghost-buses-{bucket_type}'], save)

# can run from the root of the repo like:
# python3 -m data_analysis.rt_daily_aggregations <start_date> <end_date> <bucket_name> <save>
if __name__ == "__main__":
start_date = "2022-07-17"
end_date = "2022-08-07"

data, errors, hourly_summary = compute_hourly_totals(start_date, end_date)
app()
Empty file added scrape_data/__init__.py
Empty file.
49 changes: 33 additions & 16 deletions scrape_data/combine_daily_files.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import logging
from typing import List, Optional

import boto3
import json
Expand All @@ -20,15 +21,15 @@
logging.basicConfig(level=logging.INFO)


def combine_daily_files(date: str):
def combine_daily_files(date: str, bucket_list: List[str], save: Optional[str] = None):
"""Combine raw JSON files returned by API into daily CSVs.
Args:
date: Date string for which raw JSON files should be combined into CSVs. Format: YYYY-MM-DD.
"""
s3 = boto3.resource("s3")

for bucket_name in [BUCKET_PRIVATE, BUCKET_PUBLIC]:
for bucket_name in bucket_list:
logging.info(f"processing data from {bucket_name}")
bucket = s3.Bucket(bucket_name)
objects = bucket.objects.filter(Prefix=f"bus_data/{date}")
Expand Down Expand Up @@ -77,13 +78,22 @@ def combine_daily_files(date: str):
data = pd.concat(data_list, ignore_index=True)
errors = pd.concat(errors_list, ignore_index=True)

logging.info(f"found {len(errors)} errors and {len(data)} data points for {date}")

if len(errors) > 0:
error_key = f"bus_full_day_errors_v2/{date}.csv"
logging.info(f"saving errors to {bucket}/{error_key}")
bucket.put_object(
Body=errors.to_csv(index=False),
Key=error_key,
)
if save == "bucket":
error_key = f"bus_full_day_errors_v2/{date}.csv"
logging.info(f"saving errors to {bucket}/{error_key}")
bucket.put_object(
Body=errors.to_csv(index=False),
Key=error_key,
)
if save == "local":
local_filename = f"ghost_buses_full_day_errors_from_{bucket.name}_{date}.csv"
logging.info(f"saving errors to {local_filename}")
errors.to_csv(local_filename, index = False)
else:
logging.info(f"no errors found for {date}, not saving any error file")

if len(data) > 0:
# convert data time to actual datetime
Expand All @@ -93,15 +103,22 @@ def combine_daily_files(date: str):

data["data_hour"] = data.data_time.dt.hour
data["data_date"] = data.data_time.dt.date
if save == "bucket":
data_key = f"bus_full_day_data_v2/{date}.csv"
logging.info(f"saving data to {bucket}/{data_key}")
bucket.put_object(
Body=data.to_csv(index=False),
Key=data_key,
)
if save == "local":
local_filename = f"ghost_buses_full_day_data_from_{bucket.name}_{date}.csv"
logging.info(f"saving errors to {local_filename}")
data.to_csv(local_filename, index = False)
else:
logging.info(f"no data found for {date}, not saving any data file")

data_key = f"bus_full_day_data_v2/{date}.csv"
logging.info(f"saving data to {bucket}/{data_key}")
bucket.put_object(
Body=data.to_csv(index=False),
Key=data_key,
)

return data, errors

def lambda_handler(event, context):
date = pendulum.yesterday("America/Chicago").to_date_string()
combine_daily_files(date)
data, errors = combine_daily_files(date, [BUCKET_PRIVATE, BUCKET_PUBLIC], save = "bucket")
105 changes: 105 additions & 0 deletions utils/copy_raw_data_to_public_bucket.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "12ebecef",
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"import pendulum"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dc86c046",
"metadata": {},
"outputs": [],
"source": [
"s3 = boto3.resource('s3')\n",
"private_bucket = s3.Bucket('chn-ghost-buses-private')\n",
"public_bucket = s3.Bucket('chn-ghost-buses-public')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4b83b078",
"metadata": {},
"outputs": [],
"source": [
"# copy raw data from the private bucket to the public bucket \n",
"start_date = '2022-08-10'\n",
"end_date = '2022-10-08'\n",
"\n",
"date_range = [d for d in pendulum.period(pendulum.from_format(start_date, 'YYYY-MM-DD'), pendulum.from_format(end_date, 'YYYY-MM-DD')).range('days')]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "204bb9d3",
"metadata": {},
"outputs": [],
"source": [
"# copy fully raw JSON data from private to public\n",
"for day in date_range:\n",
" date_str = day.to_date_string()\n",
" print(f\"Processing {date_str} at {pendulum.now().to_datetime_string()}\")\n",
" objects = private_bucket.objects.filter(Prefix = f'bus_data/{date_str}')\n",
" for obj in objects:\n",
" public_bucket.copy({'Bucket': private_bucket.name, 'Key': obj.key}, obj.key)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d2ac2752",
"metadata": {},
"outputs": [],
"source": [
"# copy daily CSVs from private to public\n",
"for day in date_range:\n",
" date_str = day.to_date_string()\n",
" print(f\"Processing {date_str} at {pendulum.now().to_datetime_string()}\")\n",
" data_objects = private_bucket.objects.filter(Prefix = f'bus_full_day_data_v2/{date_str}.csv')\n",
" error_objects = private_bucket.objects.filter(Prefix = f'bus_full_day_errors_v2/{date_str}.csv')\n",
" for obj in data_objects:\n",
" public_bucket.copy({'Bucket': private_bucket.name, 'Key': obj.key}, obj.key)\n",
" for obj in error_objects: \n",
" public_bucket.copy({'Bucket': private_bucket.name, 'Key': obj.key}, obj.key)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "397493c2",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

0 comments on commit 474d9ee

Please sign in to comment.