Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backfill data 2022-08-08 through 2022-10-08 into public bucket #27

Merged
merged 6 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.DS_Store
__pycache__/
.ipynb_checkpoints/
venv
.venv
Expand Down
Empty file added data_analysis/__init__.py
Empty file.
3 changes: 2 additions & 1 deletion data_analysis/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ pandas==1.4.3
geopandas==0.11.1
s3fs==2022.7.1
shapely==1.8.4
jupyter==1.0.0
jupyter==1.0.0
typer[all]==0.6.1
139 changes: 16 additions & 123 deletions data_analysis/rt_daily_aggregations.py
Original file line number Diff line number Diff line change
@@ -1,149 +1,42 @@
import os
from typing import Tuple
import logging

import boto3
import json
import pandas as pd
import pendulum
from tqdm import tqdm
import typer
from dotenv import load_dotenv
# have to run from root directory for this to work
from scrape_data import combine_daily_files

load_dotenv()
app = typer.Typer()

BUCKET_PUBLIC = os.getenv('BUCKET_PUBLIC', 'chn-ghost-buses-public')
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO)


def compute_hourly_totals(
start_data: str, end_date: str, bucket_type: str = None
) -> Tuple[pd.DataFrame]:
"""Aggregate route data to hourly totals.

Args:
start_data (str): starting date of the route
end_date (str): ending date of route
bucket_type (str, optional): which bucket to pull data from. If
'public', BUCKET_PUBLIC is used. If 'private', BUCKET_PRIVATE.
Defaults to None.

Returns:
Tuple[pd.DataFrame]: a tuple of the data, errors, and combined
hourly summaries DataFrames.
@app.command()
def combine_daily_totals(
start_date: str = typer.Argument(..., help = "Start date in format YYYY-MM-DD. Acceptable dates are any date between 2022-05-19 and yesterday (so, if today is 2022-10-11, can input any date up to 2022-10-10. Must be before end date."),
end_date: str = typer.Argument(..., help = "End date in format YYYY-MM-DD. Acceptable dates are any date between 2022-05-19 and yesterday (so, if today is 2022-10-11, can input any date up to 2022-10-10. Must be after start date."),
bucket_type: str = typer.Argument(..., help = "Either 'public' or 'private' -- bucket to read data from. User running must have read access to the bucket in question, so general users should list 'public'.")):
"""
hourly_summary_combined = pd.DataFrame()
Take all the raw JSON files for each date in a date range from one of the CHN ghost buses S3 buckets (private or public)
and aggregate them into daily CSV files.
TODO: Add save parameter so these can be saved locally rather than just back to the bucket.
"""

date_range = [
d
for d in pendulum.period(
pendulum.from_format(start_date, "YYYY-MM-DD"),
pendulum.from_format(end_date, "YYYY-MM-DD"),
).range("days")
]
s3 = boto3.resource("s3")
if bucket_type is None or bucket_type == "public":
bucket = s3.Bucket(BUCKET_PUBLIC)
else:
bucket = s3.Bucket(bucket_type)

pbar = tqdm(date_range)
for day in pbar:
date_str = day.to_date_string()
pbar.set_description(
f"Processing {date_str} at {pendulum.now().to_datetime_string()}")
objects = bucket.objects.filter(Prefix=f"bus_data/{date_str}")

logger.info(f"------- loading data at"
f"{pendulum.now().to_datetime_string()}")

# load data
data_dict = {}

# Access denied for public bucket
obj_pbar = tqdm(objects)
for obj in obj_pbar:
obj_pbar.set_description(f"loading {obj}")
obj_name = obj.key
# https://stackoverflow.com/questions/31976273/open-s3-object-as-a-string-with-boto3
obj_body = json.loads(obj.get()["Body"].read().decode("utf-8"))
data_dict[obj_name] = obj_body

# parse data into actual vehicle locations and errors

logger.info(f"------- parsing data at"
f"{pendulum.now().to_datetime_string()}")

data = pd.DataFrame()
errors = pd.DataFrame()

# k, v here are filename: full dict of JSON
data_dict_pbar = tqdm(data_dict.items())
for k, v in data_dict_pbar:
data_dict_pbar.set_description(f"processing {k}")
filename = k
new_data = pd.DataFrame()
new_errors = pd.DataFrame()
# expect ~12 "chunks" per JSON
for chunk, contents in v.items():
if "vehicle" in v[chunk]["bustime-response"].keys():
new_data = new_data.append(
pd.DataFrame(v[chunk]["bustime-response"]["vehicle"])
)
if "error" in v[chunk]["bustime-response"].keys():
new_errors = new_errors.append(
pd.DataFrame(v[chunk]["bustime-response"]["error"])
)
new_data["scrape_file"] = filename
new_errors["scrape_file"] = filename
data = data.append(new_data)
errors = errors.append(new_errors)

logger.info(f"------- saving data at"
f"{pendulum.now().to_datetime_string()}")

if len(errors) > 0:
bucket.put_object(
Body=errors.to_csv(index=False),
Key=f"bus_full_day_errors_v2/{date_str}.csv",
)

if len(data) > 0:
# convert data time to actual datetime
data["data_time"] = pd.to_datetime(
data["tmstmp"], format="%Y%m%d %H:%M")

data["data_hour"] = data.data_time.dt.hour
data["data_date"] = data.data_time.dt.date

bucket.put_object(
Body=data.to_csv(index=False),
Key=f"bus_full_day_data_v2/{date_str}.csv",
)

# combine vids into a set (drops duplicates):
# https://stackoverflow.com/a/45925961
hourly_summary = (
data.groupby(["data_date", "data_hour", "rt", "des"])
.agg({"vid": set, "tatripid": set, "tablockid": set})
.reset_index()
)
# get number of vehicles per hour per route
hourly_summary["vh_count"] = hourly_summary["vid"].apply(len)
hourly_summary["trip_count"] = hourly_summary["tatripid"].apply(
len)
hourly_summary["block_count"] = hourly_summary["tablockid"].apply(
len)

bucket.put_object(
Body=hourly_summary.to_csv(index=False),
Key=f"bus_hourly_summary_v2/{date_str}.csv",
)
pd.concat([hourly_summary_combined, hourly_summary])
return data, errors, hourly_summary
combine_daily_files.combine_daily_files(day.to_date_string(), [f'chn-ghost-buses-{bucket_type}'])


if __name__ == "__main__":
start_date = "2022-07-17"
end_date = "2022-08-07"

data, errors, hourly_summary = compute_hourly_totals(start_date, end_date)
app()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running python data_analysis/rt_daily_aggregations.py from the project root will throw a ModuleNotFoundError: No module named 'scrape_data'. For this to work, you'll have to create a root level module that imports rt_daily_aggregations. You can create a module called application.py, for example, and then add

from data_analysis import rt_daily_aggregations

rt_daily_aggregations.app()

to it. Then running python application.py from the project root will work.

Therefore, I would remove the if __name__ == '__main__' block because this isn't meant to be run from the command line.

Copy link
Member Author

@lauriemerrell lauriemerrell Oct 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was running it as python3 -m data_analysis.rt_daily_aggregations arg arg arg from the root directory and it did work... Open to changes but this was convenient for me when I was actually using it to backfill. Does that make sense? Can also try to document this somewhere

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I didn't have the -m flag so the relative imports didn't work. Leaving it as is is fine, but it would be helpful to document the -m flag.

Empty file added scrape_data/__init__.py
Empty file.
7 changes: 4 additions & 3 deletions scrape_data/combine_daily_files.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import logging
from typing import List

import boto3
import json
Expand All @@ -20,15 +21,15 @@
logging.basicConfig(level=logging.INFO)


def combine_daily_files(date: str):
def combine_daily_files(date: str, bucket_list: List[str]):
"""Combine raw JSON files returned by API into daily CSVs.

Args:
date: Date string for which raw JSON files should be combined into CSVs. Format: YYYY-MM-DD.
"""
s3 = boto3.resource("s3")

for bucket_name in [BUCKET_PRIVATE, BUCKET_PUBLIC]:
for bucket_name in bucket_list:
logging.info(f"processing data from {bucket_name}")
bucket = s3.Bucket(bucket_name)
objects = bucket.objects.filter(Prefix=f"bus_data/{date}")
Expand Down Expand Up @@ -104,4 +105,4 @@ def combine_daily_files(date: str):

def lambda_handler(event, context):
date = pendulum.yesterday("America/Chicago").to_date_string()
combine_daily_files(date)
combine_daily_files(date, [BUCKET_PRIVATE, BUCKET_PUBLIC])
105 changes: 105 additions & 0 deletions utils/copy_raw_data_to_public_bucket.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "12ebecef",
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"import pendulum"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dc86c046",
"metadata": {},
"outputs": [],
"source": [
"s3 = boto3.resource('s3')\n",
"private_bucket = s3.Bucket('chn-ghost-buses-private')\n",
"public_bucket = s3.Bucket('chn-ghost-buses-public')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4b83b078",
"metadata": {},
"outputs": [],
"source": [
"# copy raw data from the private bucket to the public bucket \n",
"start_date = '2022-08-10'\n",
"end_date = '2022-10-08'\n",
"\n",
"date_range = [d for d in pendulum.period(pendulum.from_format(start_date, 'YYYY-MM-DD'), pendulum.from_format(end_date, 'YYYY-MM-DD')).range('days')]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "204bb9d3",
"metadata": {},
"outputs": [],
"source": [
"# copy fully raw JSON data from private to public\n",
"for day in date_range:\n",
" date_str = day.to_date_string()\n",
" print(f\"Processing {date_str} at {pendulum.now().to_datetime_string()}\")\n",
" objects = private_bucket.objects.filter(Prefix = f'bus_data/{date_str}')\n",
" for obj in objects:\n",
" public_bucket.copy({'Bucket': private_bucket.name, 'Key': obj.key}, obj.key)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d2ac2752",
"metadata": {},
"outputs": [],
"source": [
"# copy daily CSVs from private to public\n",
"for day in date_range:\n",
" date_str = day.to_date_string()\n",
" print(f\"Processing {date_str} at {pendulum.now().to_datetime_string()}\")\n",
" data_objects = private_bucket.objects.filter(Prefix = f'bus_full_day_data_v2/{date_str}.csv')\n",
" error_objects = private_bucket.objects.filter(Prefix = f'bus_full_day_errors_v2/{date_str}.csv')\n",
" for obj in data_objects:\n",
" public_bucket.copy({'Bucket': private_bucket.name, 'Key': obj.key}, obj.key)\n",
" for obj in error_objects: \n",
" public_bucket.copy({'Bucket': private_bucket.name, 'Key': obj.key}, obj.key)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "397493c2",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}