Skip to content

Commit

Permalink
Merge pull request #825 from cal-itp/vp-accuracy
Browse files Browse the repository at this point in the history
Vp accuracy
  • Loading branch information
tiffanychu90 authored Aug 4, 2023
2 parents 5e49c5c + 1b218c8 commit aa23638
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 210 deletions.
4 changes: 2 additions & 2 deletions _shared_utils/shared_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

import dask_geopandas as dg
import fsspec
import gcsfs
import geopandas as gpd
import requests
from calitp_data_analysis import get_fs

fs = gcsfs.GCSFileSystem()
fs = get_fs()


def sanitize_file_path(file_name: str) -> str:
Expand Down
20 changes: 20 additions & 0 deletions rt_scheduled_v_ran/logs/spatial_accuracy.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
2023-08-03 09:00:53.761 | INFO | __main__:<module>:172 - Analysis date: 2023-06-14
2023-08-03 09:01:13.253 | INFO | __main__:<module>:198 - get delayed by operator: 0:00:19.490708
2023-08-03 09:41:41.514 | INFO | __main__:<module>:203 - compute results: 0:40:28.260681
2023-08-03 09:41:42.365 | INFO | __main__:<module>:214 - export: 0:00:00.850823
2023-08-03 09:41:42.365 | INFO | __main__:<module>:215 - execution time: 0:40:48.602212
2023-08-03 13:01:15.754 | INFO | __main__:<module>:196 - Analysis date: 2023-05-17
2023-08-03 13:01:39.246 | INFO | __main__:<module>:221 - get delayed by operator: 0:00:23.489165
2023-08-03 13:58:20.869 | INFO | __main__:<module>:226 - compute results: 0:56:41.622821
2023-08-03 13:58:21.706 | INFO | __main__:<module>:237 - export: 0:00:00.837454
2023-08-03 13:58:21.709 | INFO | __main__:<module>:238 - execution time: 0:57:05.949440
2023-08-03 14:49:15.922 | INFO | __main__:<module>:193 - Analysis date: 2023-04-12
2023-08-03 14:49:35.373 | INFO | __main__:<module>:218 - get delayed by operator: 0:00:19.450492
2023-08-03 15:38:51.126 | INFO | __main__:<module>:223 - compute results: 0:49:15.752969
2023-08-03 15:38:51.771 | INFO | __main__:<module>:234 - export: 0:00:00.645042
2023-08-03 15:38:51.772 | INFO | __main__:<module>:235 - execution time: 0:49:35.848503
2023-08-03 16:04:56.684 | INFO | __main__:<module>:193 - Analysis date: 2023-03-15
2023-08-03 16:05:26.228 | INFO | __main__:<module>:218 - get delayed by operator: 0:00:29.542906
2023-08-03 16:52:54.190 | INFO | __main__:<module>:223 - compute results: 0:47:27.961693
2023-08-03 16:52:54.953 | INFO | __main__:<module>:234 - export: 0:00:00.763199
2023-08-03 16:52:54.954 | INFO | __main__:<module>:235 - execution time: 0:47:58.267798
238 changes: 238 additions & 0 deletions rt_scheduled_v_ran/scripts/vp_spatial_accuracy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
"""
Calculate a trip-level metric of spatial accuracy.
Newmark's GTFS RT spatial accuracy metric is simply
how many vehicle positions correctly join onto a buffered
shape.
"""
import dask.dataframe as dd
import dask_geopandas as dg
import datetime
import geopandas as gpd
import numpy as np
import pandas as pd
import sys

from dask import delayed, compute
from loguru import logger

from segment_speed_utils.project_vars import (COMPILED_CACHED_VIEWS, SEGMENT_GCS,
RT_SCHED_GCS,
analysis_date, PROJECT_CRS
)

def grab_shape_keys_in_vp(analysis_date: str) -> pd.DataFrame:
"""
Subset raw vp and find unique trip_instance_keys.
Create crosswalk to link trip_instance_key to shape_array_key.
"""
vp_trip_df = (pd.read_parquet(
f"{SEGMENT_GCS}vp_{analysis_date}.parquet",
columns=["gtfs_dataset_key", "trip_instance_key"])
.drop_duplicates()
.dropna(subset="trip_instance_key")
).astype({"gtfs_dataset_key": "str"})

# Make sure we have a shape geometry too
shapes = pd.read_parquet(
f"{COMPILED_CACHED_VIEWS}routelines_{analysis_date}.parquet",
columns = ["shape_array_key"],
).dropna().drop_duplicates()

trips_with_shape = pd.read_parquet(
f"{COMPILED_CACHED_VIEWS}trips_{analysis_date}.parquet",
columns=["trip_instance_key", "shape_array_key"],
).merge(
shapes,
on = "shape_array_key",
how = "inner"
).merge(
vp_trip_df,
on = "trip_instance_key",
how = "inner"
).drop_duplicates().dropna().reset_index(drop=True)

return trips_with_shape


def buffer_shapes(
analysis_date: str,
buffer_meters: int = 35,
**kwargs
) -> gpd.GeoDataFrame:
"""
Filter scheduled shapes down to the shapes that appear in vp.
Buffer these.
"""
shapes = gpd.read_parquet(
f"{COMPILED_CACHED_VIEWS}routelines_{analysis_date}.parquet",
columns = ["shape_array_key", "geometry"],
**kwargs
).to_crs(PROJECT_CRS)

# to_crs takes awhile, so do a filtering on only shapes we need
shapes = shapes.assign(
geometry = shapes.geometry.buffer(buffer_meters)
)

return shapes


def attach_shape_geometry(
analysis_date: str,
trips_with_shape_subset: pd.DataFrame,
buffer_meters: int = 35,
) -> gpd.GeoDataFrame:
"""
Attach the shape geometry for a subset of shapes or trips.
"""
shapes_subset = trips_with_shape_subset.shape_array_key.unique().tolist()

shapes = buffer_shapes(
analysis_date,
buffer_meters = 35,
filters = [[("shape_array_key", "in", shapes_subset)]]
)

trips_with_shape_geom = pd.merge(
shapes,
trips_with_shape_subset,
on = "shape_array_key",
how = "inner"
)

return trips_with_shape_geom


def merge_vp_with_shape_and_count(
analysis_date: str,
trips_with_shape_geom: gpd.GeoDataFrame
) -> gpd.GeoDataFrame:
"""
Merge vp with crosswalk and buffered shapes.
Get vp count totals and vp within shape.
"""
subset_rt_keys = trips_with_shape_geom.gtfs_dataset_key.unique().tolist()
subset_trips = trips_with_shape_geom.trip_instance_key.unique().tolist()

vp = gpd.read_parquet(
f"{SEGMENT_GCS}vp_{analysis_date}.parquet",
columns=["trip_instance_key", "location_timestamp_local",
"geometry"],
filters = [[("gtfs_dataset_key", "in", subset_rt_keys),
("trip_instance_key", "in", subset_trips)]],
#pc.field('trip_instance_key').is_valid()
).to_crs(PROJECT_CRS)

vp2 = pd.merge(
vp,
trips_with_shape_geom,
on = "trip_instance_key",
how = "inner"
).reset_index(drop=True)

total_vp = total_vp_counts_by_trip(vp2)

vp2 = vp2.assign(
is_within = vp2.geometry_x.within(vp2.geometry_y)
).query('is_within==True')

vps_in_shape = (vp2.groupby("trip_instance_key",
observed = True, group_keys = False)
.agg({"location_timestamp_local": "count"})
.reset_index()
.rename(columns = {"location_timestamp_local": "vp_in_shape"})
)

vps_in_shape = vps_in_shape.assign(
vp_in_shape = vps_in_shape.vp_in_shape.fillna(0).astype(int)
)

count_df = pd.merge(
total_vp,
vps_in_shape,
on = "trip_instance_key",
how = "left"
)

return count_df


def total_vp_counts_by_trip(vp: gpd.GeoDataFrame) -> pd.DataFrame:
"""
Get a count of vp for each trip, whether or not those fall
within buffered shape or not
"""
count_vp = (
vp.groupby("trip_instance_key",
observed=True, group_keys=False)
.agg({"location_timestamp_local": "count"})
.reset_index()
.rename(columns={"location_timestamp_local": "total_vp"})
)

count_vp = count_vp.assign(
total_vp = count_vp.total_vp.fillna(0).astype(int)
)

return count_vp


if __name__=="__main__":

#from dask.distributed import LocalCluster, Client

#client = Client("dask-scheduler.dask.svc.cluster.local:8786")
#cluster = LocalCluster(n_workers = 2)

LOG_FILE = "../logs/spatial_accuracy.log"
logger.add(LOG_FILE, retention="3 months")
logger.add(sys.stderr,
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
level="INFO")

logger.info(f"Analysis date: {analysis_date}")

start = datetime.datetime.now()

trips_with_shape = grab_shape_keys_in_vp(analysis_date)

rt_operators = trips_with_shape.gtfs_dataset_key.unique().tolist()

# Set up delayed lists of inputs by operator
operator_trips_with_shapes = [
delayed(attach_shape_geometry)(
analysis_date,
trips_with_shape[trips_with_shape.gtfs_dataset_key == rt_key],
buffer_meters = 35,
) for rt_key in rt_operators
]

operator_results = [
delayed(merge_vp_with_shape_and_count)(
analysis_date,
operator_df
) for operator_df in operator_trips_with_shapes
]

time1 = datetime.datetime.now()
logger.info(f"get delayed by operator: {time1 - start}")

results = [compute(i)[0] for i in operator_results]

time2 = datetime.datetime.now()
logger.info(f"compute results: {time2 - time1}")

results_df = pd.concat(
results, axis=0
).reset_index(drop=True)

results_df.to_parquet(
f"{RT_SCHED_GCS}vp_spatial_accuracy_{analysis_date}.parquet")

end = datetime.datetime.now()
logger.info(f"export: {end - time2}")
logger.info(f"execution time: {end - start}")

#client.close()
#cluster.close()
Loading

0 comments on commit aa23638

Please sign in to comment.