forked from chihacknight/chn-ghost-buses
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request chihacknight#27 from chihacknight/update-backfill-…
…aggregations backfill data 2022-08-08 through 2022-10-08 into public bucket
- Loading branch information
Showing
7 changed files
with
161 additions
and
141 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
.DS_Store | ||
__pycache__/ | ||
.ipynb_checkpoints/ | ||
venv | ||
.venv | ||
|
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,4 +3,5 @@ pandas==1.4.3 | |
geopandas==0.11.1 | ||
s3fs==2022.7.1 | ||
shapely==1.8.4 | ||
jupyter==1.0.0 | ||
jupyter==1.0.0 | ||
typer[all]==0.6.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,149 +1,45 @@ | ||
import os | ||
from typing import Tuple | ||
import logging | ||
|
||
import boto3 | ||
import json | ||
import pandas as pd | ||
import pendulum | ||
from tqdm import tqdm | ||
import typer | ||
from dotenv import load_dotenv | ||
# have to run from root directory for this to work | ||
from scrape_data import combine_daily_files | ||
|
||
load_dotenv() | ||
app = typer.Typer() | ||
|
||
BUCKET_PUBLIC = os.getenv('BUCKET_PUBLIC', 'chn-ghost-buses-public') | ||
logger = logging.getLogger() | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
|
||
def compute_hourly_totals( | ||
start_data: str, end_date: str, bucket_type: str = None | ||
) -> Tuple[pd.DataFrame]: | ||
"""Aggregate route data to hourly totals. | ||
Args: | ||
start_data (str): starting date of the route | ||
end_date (str): ending date of route | ||
bucket_type (str, optional): which bucket to pull data from. If | ||
'public', BUCKET_PUBLIC is used. If 'private', BUCKET_PRIVATE. | ||
Defaults to None. | ||
Returns: | ||
Tuple[pd.DataFrame]: a tuple of the data, errors, and combined | ||
hourly summaries DataFrames. | ||
@app.command() | ||
def combine_daily_totals( | ||
start_date: str = typer.Argument(..., help = "Start date in format YYYY-MM-DD. Acceptable dates are any date between 2022-05-19 and yesterday (so, if today is 2022-10-11, can input any date up to 2022-10-10. Must be before end date."), | ||
end_date: str = typer.Argument(..., help = "End date in format YYYY-MM-DD. Acceptable dates are any date between 2022-05-19 and yesterday (so, if today is 2022-10-11, can input any date up to 2022-10-10. Must be after start date."), | ||
bucket_type: str = typer.Argument(..., help = "Either 'public' or 'private' -- bucket to read data from. User running must have read access to the bucket in question, so general users should list 'public'."), | ||
save: str = typer.Argument(..., help = "Optional string 'bucket' or 'local' indicating whether you want to save your combined files in the bucket they're being read from or locally. If empty, files will not be saved.") | ||
): | ||
""" | ||
hourly_summary_combined = pd.DataFrame() | ||
Take all the raw JSON files for each date in a date range from one of the CHN ghost buses S3 buckets (private or public) | ||
and aggregate them into daily CSV files. | ||
TODO: Add save parameter so these can be saved locally rather than just back to the bucket. | ||
""" | ||
|
||
date_range = [ | ||
d | ||
for d in pendulum.period( | ||
pendulum.from_format(start_date, "YYYY-MM-DD"), | ||
pendulum.from_format(end_date, "YYYY-MM-DD"), | ||
).range("days") | ||
] | ||
s3 = boto3.resource("s3") | ||
if bucket_type is None or bucket_type == "public": | ||
bucket = s3.Bucket(BUCKET_PUBLIC) | ||
else: | ||
bucket = s3.Bucket(bucket_type) | ||
|
||
pbar = tqdm(date_range) | ||
for day in pbar: | ||
date_str = day.to_date_string() | ||
pbar.set_description( | ||
f"Processing {date_str} at {pendulum.now().to_datetime_string()}") | ||
objects = bucket.objects.filter(Prefix=f"bus_data/{date_str}") | ||
|
||
logger.info(f"------- loading data at" | ||
f"{pendulum.now().to_datetime_string()}") | ||
|
||
# load data | ||
data_dict = {} | ||
|
||
# Access denied for public bucket | ||
obj_pbar = tqdm(objects) | ||
for obj in obj_pbar: | ||
obj_pbar.set_description(f"loading {obj}") | ||
obj_name = obj.key | ||
# https://stackoverflow.com/questions/31976273/open-s3-object-as-a-string-with-boto3 | ||
obj_body = json.loads(obj.get()["Body"].read().decode("utf-8")) | ||
data_dict[obj_name] = obj_body | ||
|
||
# parse data into actual vehicle locations and errors | ||
|
||
logger.info(f"------- parsing data at" | ||
f"{pendulum.now().to_datetime_string()}") | ||
|
||
data = pd.DataFrame() | ||
errors = pd.DataFrame() | ||
|
||
# k, v here are filename: full dict of JSON | ||
data_dict_pbar = tqdm(data_dict.items()) | ||
for k, v in data_dict_pbar: | ||
data_dict_pbar.set_description(f"processing {k}") | ||
filename = k | ||
new_data = pd.DataFrame() | ||
new_errors = pd.DataFrame() | ||
# expect ~12 "chunks" per JSON | ||
for chunk, contents in v.items(): | ||
if "vehicle" in v[chunk]["bustime-response"].keys(): | ||
new_data = new_data.append( | ||
pd.DataFrame(v[chunk]["bustime-response"]["vehicle"]) | ||
) | ||
if "error" in v[chunk]["bustime-response"].keys(): | ||
new_errors = new_errors.append( | ||
pd.DataFrame(v[chunk]["bustime-response"]["error"]) | ||
) | ||
new_data["scrape_file"] = filename | ||
new_errors["scrape_file"] = filename | ||
data = data.append(new_data) | ||
errors = errors.append(new_errors) | ||
|
||
logger.info(f"------- saving data at" | ||
f"{pendulum.now().to_datetime_string()}") | ||
|
||
if len(errors) > 0: | ||
bucket.put_object( | ||
Body=errors.to_csv(index=False), | ||
Key=f"bus_full_day_errors_v2/{date_str}.csv", | ||
) | ||
|
||
if len(data) > 0: | ||
# convert data time to actual datetime | ||
data["data_time"] = pd.to_datetime( | ||
data["tmstmp"], format="%Y%m%d %H:%M") | ||
|
||
data["data_hour"] = data.data_time.dt.hour | ||
data["data_date"] = data.data_time.dt.date | ||
|
||
bucket.put_object( | ||
Body=data.to_csv(index=False), | ||
Key=f"bus_full_day_data_v2/{date_str}.csv", | ||
) | ||
|
||
# combine vids into a set (drops duplicates): | ||
# https://stackoverflow.com/a/45925961 | ||
hourly_summary = ( | ||
data.groupby(["data_date", "data_hour", "rt", "des"]) | ||
.agg({"vid": set, "tatripid": set, "tablockid": set}) | ||
.reset_index() | ||
) | ||
# get number of vehicles per hour per route | ||
hourly_summary["vh_count"] = hourly_summary["vid"].apply(len) | ||
hourly_summary["trip_count"] = hourly_summary["tatripid"].apply( | ||
len) | ||
hourly_summary["block_count"] = hourly_summary["tablockid"].apply( | ||
len) | ||
|
||
bucket.put_object( | ||
Body=hourly_summary.to_csv(index=False), | ||
Key=f"bus_hourly_summary_v2/{date_str}.csv", | ||
) | ||
pd.concat([hourly_summary_combined, hourly_summary]) | ||
return data, errors, hourly_summary | ||
|
||
data, errors = combine_daily_files.combine_daily_files(day.to_date_string(), [f'chn-ghost-buses-{bucket_type}'], save) | ||
|
||
# can run from the root of the repo like: | ||
# python3 -m data_analysis.rt_daily_aggregations <start_date> <end_date> <bucket_name> <save> | ||
if __name__ == "__main__": | ||
start_date = "2022-07-17" | ||
end_date = "2022-08-07" | ||
|
||
data, errors, hourly_summary = compute_hourly_totals(start_date, end_date) | ||
app() |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "12ebecef", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import boto3\n", | ||
"import pendulum" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "dc86c046", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"s3 = boto3.resource('s3')\n", | ||
"private_bucket = s3.Bucket('chn-ghost-buses-private')\n", | ||
"public_bucket = s3.Bucket('chn-ghost-buses-public')" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "4b83b078", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# copy raw data from the private bucket to the public bucket \n", | ||
"start_date = '2022-08-10'\n", | ||
"end_date = '2022-10-08'\n", | ||
"\n", | ||
"date_range = [d for d in pendulum.period(pendulum.from_format(start_date, 'YYYY-MM-DD'), pendulum.from_format(end_date, 'YYYY-MM-DD')).range('days')]" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "204bb9d3", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# copy fully raw JSON data from private to public\n", | ||
"for day in date_range:\n", | ||
" date_str = day.to_date_string()\n", | ||
" print(f\"Processing {date_str} at {pendulum.now().to_datetime_string()}\")\n", | ||
" objects = private_bucket.objects.filter(Prefix = f'bus_data/{date_str}')\n", | ||
" for obj in objects:\n", | ||
" public_bucket.copy({'Bucket': private_bucket.name, 'Key': obj.key}, obj.key)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "d2ac2752", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# copy daily CSVs from private to public\n", | ||
"for day in date_range:\n", | ||
" date_str = day.to_date_string()\n", | ||
" print(f\"Processing {date_str} at {pendulum.now().to_datetime_string()}\")\n", | ||
" data_objects = private_bucket.objects.filter(Prefix = f'bus_full_day_data_v2/{date_str}.csv')\n", | ||
" error_objects = private_bucket.objects.filter(Prefix = f'bus_full_day_errors_v2/{date_str}.csv')\n", | ||
" for obj in data_objects:\n", | ||
" public_bucket.copy({'Bucket': private_bucket.name, 'Key': obj.key}, obj.key)\n", | ||
" for obj in error_objects: \n", | ||
" public_bucket.copy({'Bucket': private_bucket.name, 'Key': obj.key}, obj.key)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "397493c2", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": "Python 3 (ipykernel)", | ||
"language": "python", | ||
"name": "python3" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.9.7" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 5 | ||
} |