diff --git a/_shared_utils/shared_utils/utils.py b/_shared_utils/shared_utils/utils.py index 174f71bb6..34e7bf322 100644 --- a/_shared_utils/shared_utils/utils.py +++ b/_shared_utils/shared_utils/utils.py @@ -9,11 +9,11 @@ import dask_geopandas as dg import fsspec -import gcsfs import geopandas as gpd import requests +from calitp_data_analysis import get_fs -fs = gcsfs.GCSFileSystem() +fs = get_fs() def sanitize_file_path(file_name: str) -> str: diff --git a/rt_scheduled_v_ran/logs/spatial_accuracy.log b/rt_scheduled_v_ran/logs/spatial_accuracy.log new file mode 100644 index 000000000..24044eecc --- /dev/null +++ b/rt_scheduled_v_ran/logs/spatial_accuracy.log @@ -0,0 +1,20 @@ +2023-08-03 09:00:53.761 | INFO | __main__::172 - Analysis date: 2023-06-14 +2023-08-03 09:01:13.253 | INFO | __main__::198 - get delayed by operator: 0:00:19.490708 +2023-08-03 09:41:41.514 | INFO | __main__::203 - compute results: 0:40:28.260681 +2023-08-03 09:41:42.365 | INFO | __main__::214 - export: 0:00:00.850823 +2023-08-03 09:41:42.365 | INFO | __main__::215 - execution time: 0:40:48.602212 +2023-08-03 13:01:15.754 | INFO | __main__::196 - Analysis date: 2023-05-17 +2023-08-03 13:01:39.246 | INFO | __main__::221 - get delayed by operator: 0:00:23.489165 +2023-08-03 13:58:20.869 | INFO | __main__::226 - compute results: 0:56:41.622821 +2023-08-03 13:58:21.706 | INFO | __main__::237 - export: 0:00:00.837454 +2023-08-03 13:58:21.709 | INFO | __main__::238 - execution time: 0:57:05.949440 +2023-08-03 14:49:15.922 | INFO | __main__::193 - Analysis date: 2023-04-12 +2023-08-03 14:49:35.373 | INFO | __main__::218 - get delayed by operator: 0:00:19.450492 +2023-08-03 15:38:51.126 | INFO | __main__::223 - compute results: 0:49:15.752969 +2023-08-03 15:38:51.771 | INFO | __main__::234 - export: 0:00:00.645042 +2023-08-03 15:38:51.772 | INFO | __main__::235 - execution time: 0:49:35.848503 +2023-08-03 16:04:56.684 | INFO | __main__::193 - Analysis date: 2023-03-15 +2023-08-03 16:05:26.228 | INFO | __main__::218 - get delayed by operator: 0:00:29.542906 +2023-08-03 16:52:54.190 | INFO | __main__::223 - compute results: 0:47:27.961693 +2023-08-03 16:52:54.953 | INFO | __main__::234 - export: 0:00:00.763199 +2023-08-03 16:52:54.954 | INFO | __main__::235 - execution time: 0:47:58.267798 diff --git a/rt_scheduled_v_ran/scripts/vp_spatial_accuracy.py b/rt_scheduled_v_ran/scripts/vp_spatial_accuracy.py new file mode 100644 index 000000000..74527b4da --- /dev/null +++ b/rt_scheduled_v_ran/scripts/vp_spatial_accuracy.py @@ -0,0 +1,238 @@ +""" +Calculate a trip-level metric of spatial accuracy. + +Newmark's GTFS RT spatial accuracy metric is simply +how many vehicle positions correctly join onto a buffered +shape. +""" +import dask.dataframe as dd +import dask_geopandas as dg +import datetime +import geopandas as gpd +import numpy as np +import pandas as pd +import sys + +from dask import delayed, compute +from loguru import logger + +from segment_speed_utils.project_vars import (COMPILED_CACHED_VIEWS, SEGMENT_GCS, + RT_SCHED_GCS, + analysis_date, PROJECT_CRS + ) + +def grab_shape_keys_in_vp(analysis_date: str) -> pd.DataFrame: + """ + Subset raw vp and find unique trip_instance_keys. + Create crosswalk to link trip_instance_key to shape_array_key. + """ + vp_trip_df = (pd.read_parquet( + f"{SEGMENT_GCS}vp_{analysis_date}.parquet", + columns=["gtfs_dataset_key", "trip_instance_key"]) + .drop_duplicates() + .dropna(subset="trip_instance_key") + ).astype({"gtfs_dataset_key": "str"}) + + # Make sure we have a shape geometry too + shapes = pd.read_parquet( + f"{COMPILED_CACHED_VIEWS}routelines_{analysis_date}.parquet", + columns = ["shape_array_key"], + ).dropna().drop_duplicates() + + trips_with_shape = pd.read_parquet( + f"{COMPILED_CACHED_VIEWS}trips_{analysis_date}.parquet", + columns=["trip_instance_key", "shape_array_key"], + ).merge( + shapes, + on = "shape_array_key", + how = "inner" + ).merge( + vp_trip_df, + on = "trip_instance_key", + how = "inner" + ).drop_duplicates().dropna().reset_index(drop=True) + + return trips_with_shape + + +def buffer_shapes( + analysis_date: str, + buffer_meters: int = 35, + **kwargs +) -> gpd.GeoDataFrame: + """ + Filter scheduled shapes down to the shapes that appear in vp. + Buffer these. + """ + shapes = gpd.read_parquet( + f"{COMPILED_CACHED_VIEWS}routelines_{analysis_date}.parquet", + columns = ["shape_array_key", "geometry"], + **kwargs + ).to_crs(PROJECT_CRS) + + # to_crs takes awhile, so do a filtering on only shapes we need + shapes = shapes.assign( + geometry = shapes.geometry.buffer(buffer_meters) + ) + + return shapes + + +def attach_shape_geometry( + analysis_date: str, + trips_with_shape_subset: pd.DataFrame, + buffer_meters: int = 35, +) -> gpd.GeoDataFrame: + """ + Attach the shape geometry for a subset of shapes or trips. + """ + shapes_subset = trips_with_shape_subset.shape_array_key.unique().tolist() + + shapes = buffer_shapes( + analysis_date, + buffer_meters = 35, + filters = [[("shape_array_key", "in", shapes_subset)]] + ) + + trips_with_shape_geom = pd.merge( + shapes, + trips_with_shape_subset, + on = "shape_array_key", + how = "inner" + ) + + return trips_with_shape_geom + + +def merge_vp_with_shape_and_count( + analysis_date: str, + trips_with_shape_geom: gpd.GeoDataFrame +) -> gpd.GeoDataFrame: + """ + Merge vp with crosswalk and buffered shapes. + Get vp count totals and vp within shape. + """ + subset_rt_keys = trips_with_shape_geom.gtfs_dataset_key.unique().tolist() + subset_trips = trips_with_shape_geom.trip_instance_key.unique().tolist() + + vp = gpd.read_parquet( + f"{SEGMENT_GCS}vp_{analysis_date}.parquet", + columns=["trip_instance_key", "location_timestamp_local", + "geometry"], + filters = [[("gtfs_dataset_key", "in", subset_rt_keys), + ("trip_instance_key", "in", subset_trips)]], + #pc.field('trip_instance_key').is_valid() + ).to_crs(PROJECT_CRS) + + vp2 = pd.merge( + vp, + trips_with_shape_geom, + on = "trip_instance_key", + how = "inner" + ).reset_index(drop=True) + + total_vp = total_vp_counts_by_trip(vp2) + + vp2 = vp2.assign( + is_within = vp2.geometry_x.within(vp2.geometry_y) + ).query('is_within==True') + + vps_in_shape = (vp2.groupby("trip_instance_key", + observed = True, group_keys = False) + .agg({"location_timestamp_local": "count"}) + .reset_index() + .rename(columns = {"location_timestamp_local": "vp_in_shape"}) + ) + + vps_in_shape = vps_in_shape.assign( + vp_in_shape = vps_in_shape.vp_in_shape.fillna(0).astype(int) + ) + + count_df = pd.merge( + total_vp, + vps_in_shape, + on = "trip_instance_key", + how = "left" + ) + + return count_df + + +def total_vp_counts_by_trip(vp: gpd.GeoDataFrame) -> pd.DataFrame: + """ + Get a count of vp for each trip, whether or not those fall + within buffered shape or not + """ + count_vp = ( + vp.groupby("trip_instance_key", + observed=True, group_keys=False) + .agg({"location_timestamp_local": "count"}) + .reset_index() + .rename(columns={"location_timestamp_local": "total_vp"}) + ) + + count_vp = count_vp.assign( + total_vp = count_vp.total_vp.fillna(0).astype(int) + ) + + return count_vp + + +if __name__=="__main__": + + #from dask.distributed import LocalCluster, Client + + #client = Client("dask-scheduler.dask.svc.cluster.local:8786") + #cluster = LocalCluster(n_workers = 2) + + LOG_FILE = "../logs/spatial_accuracy.log" + logger.add(LOG_FILE, retention="3 months") + logger.add(sys.stderr, + format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", + level="INFO") + + logger.info(f"Analysis date: {analysis_date}") + + start = datetime.datetime.now() + + trips_with_shape = grab_shape_keys_in_vp(analysis_date) + + rt_operators = trips_with_shape.gtfs_dataset_key.unique().tolist() + + # Set up delayed lists of inputs by operator + operator_trips_with_shapes = [ + delayed(attach_shape_geometry)( + analysis_date, + trips_with_shape[trips_with_shape.gtfs_dataset_key == rt_key], + buffer_meters = 35, + ) for rt_key in rt_operators + ] + + operator_results = [ + delayed(merge_vp_with_shape_and_count)( + analysis_date, + operator_df + ) for operator_df in operator_trips_with_shapes + ] + + time1 = datetime.datetime.now() + logger.info(f"get delayed by operator: {time1 - start}") + + results = [compute(i)[0] for i in operator_results] + + time2 = datetime.datetime.now() + logger.info(f"compute results: {time2 - time1}") + + results_df = pd.concat( + results, axis=0 + ).reset_index(drop=True) + + results_df.to_parquet( + f"{RT_SCHED_GCS}vp_spatial_accuracy_{analysis_date}.parquet") + + end = datetime.datetime.now() + logger.info(f"export: {end - time2}") + logger.info(f"execution time: {end - start}") + + #client.close() + #cluster.close() \ No newline at end of file diff --git a/rt_segment_speeds/scripts/C4_spatial_accuracy.py b/rt_segment_speeds/scripts/C4_spatial_accuracy.py deleted file mode 100644 index 24073d27c..000000000 --- a/rt_segment_speeds/scripts/C4_spatial_accuracy.py +++ /dev/null @@ -1,208 +0,0 @@ -""" -Calculate a trip-level metric of spatial accuracy. - -Newmark's GTFS RT spatial accuracy metric is simply -how many vehicle positions correctly join onto a buffered -shape. -""" -import dask.dataframe as dd -import dask_geopandas as dg -import datetime -import geopandas as gpd -import pandas as pd - -from dask import delayed, compute - -from segment_speed_utils.project_vars import (COMPILED_CACHED_VIEWS, SEGMENT_GCS, - analysis_date, PROJECT_CRS - ) - -def grab_shape_keys_in_vp(analysis_date: str) -> pd.DataFrame: - """ - Subset raw vp and find unique trip_instance_keys. - Create crosswalk to link trip_instance_key to shape_array_key. - """ - vp_trips = (pd.read_parquet( - f"{SEGMENT_GCS}vp_{analysis_date}.parquet", - columns=["trip_instance_key"]) - .drop_duplicates() - .trip_instance_key - .tolist() - ) - - trips_with_shape = pd.read_parquet( - f"{COMPILED_CACHED_VIEWS}trips_{analysis_date}.parquet", - columns=["trip_instance_key", "shape_array_key"], - filters=[[("trip_instance_key", "in", vp_trips)]], - ) - - return trips_with_shape - - -def buffer_shapes( - analysis_date: str, - buffer_meters: int = 35, - **kwargs -) -> gpd.GeoDataFrame: - """ - Filter scheduled shapes down to the shapes that appear in vp. - Buffer these. - """ - shapes = gpd.read_parquet( - f"{COMPILED_CACHED_VIEWS}routelines_{analysis_date}.parquet", - columns = ["shape_array_key", "geometry"], - **kwargs - ).to_crs(PROJECT_CRS) - - # to_crs takes awhile, so do a filtering on only shapes we need - shapes = shapes.assign( - geometry = shapes.geometry.buffer(buffer_meters) - ) - - return shapes - - -def merge_vp_with_shape( - analysis_date: str, - trips_with_shape: gpd.GeoDataFrame -) -> gpd.GeoDataFrame: - """ - Merge vp with crosswalk and buffered shapes. - """ - subset_trips = trips_with_shape.trip_instance_key.unique().tolist() - - vp = dg.read_parquet( - f"{SEGMENT_GCS}vp_{analysis_date}.parquet", - columns=["trip_instance_key", "location_timestamp_local", - "geometry"], - ).to_crs(PROJECT_CRS) - - vp2 = dd.merge( - vp, - trips_with_shape, - on = "trip_instance_key", - how = "inner" - ) - - return vp2 - - -def subset_vp(vp: dg.GeoDataFrame, one_shape: str) -> dg.GeoDataFrame: - vp2 = vp[vp.shape_array_key==one_shape].reset_index(drop=True) - return vp2 - - -def total_vp_counts_by_trip(vp: dg.GeoDataFrame) -> dd.DataFrame: - # Get a count of vp for each trip, whether or not those fall - # within buffered shape or not - count_vp = ( - vp.groupby("trip_instance_key", - observed=True, group_keys=False) - .agg({"location_timestamp_local": "count"}) - .reset_index() - .rename(columns={"location_timestamp_local": "total_vp"}) - ) - - return count_vp - - -def vp_in_shape_by_trip( - vp: dg.GeoDataFrame, - one_shape: gpd.GeoDataFrame, -) -> dd.DataFrame: - """ - Find if vp point intersects with our buffered shape. - Get counts by trip. - """ - vp2 = dg.sjoin( - vp, - one_shape, - how = "inner", - predicate = "intersects" - )[["trip_instance_key", "location_timestamp_local"]].drop_duplicates() - - count_in_shape = ( - vp2.groupby("trip_instance_key", - observed=True, group_keys=False) - .agg({"location_timestamp_local": "count"}) - .reset_index() - .rename(columns={"location_timestamp_local": "vp_in_shape"}) - ) - - return count_in_shape - - -if __name__=="__main__": - - #from dask.distributed import Client - - #client = Client("dask-scheduler.dask.svc.cluster.local:8786") - - print(f"Analysis date: {analysis_date}") - - start = datetime.datetime.now() - - trips_with_shape = grab_shape_keys_in_vp(analysis_date) - shapes_in_vp = trips_with_shape.shape_array_key.unique().tolist() - print(shapes_in_vp) - - shapes = delayed(buffer_shapes)( - analysis_date, - buffer_meters = 35, - filters = [[("shape_array_key", "in", shapes_in_vp)]] - ) - - vp_with_shape = delayed(merge_vp_with_shape)( - analysis_date, trips_with_shape) - - vp_dfs = [ - delayed(subset_vp)(vp_with_shape, shape) - for shape in shapes_in_vp - ] - - shape_dfs = [ - delayed(shapes[shapes.shape_array_key==s]) - for s in shapes_in_vp - ] - - results = [ - delayed(vp_in_shape_by_trip)(one_vp_df, one_shape_df).persist() - for one_vp_df, one_shape_df in zip(vp_dfs, shape_dfs) - ] - - results2 = [compute(i)[0] for i in results] - - vp_trip_in_shape_totals = dd.multi.concat( - results2, axis=0).set_index("trip_instance_key") - - print(vp_trip_in_shape_totals.dtypes) - - time1 = datetime.datetime.now() - print(f"get trip counts in shape: {time1 - start}") - - vp_trip_totals = total_vp_counts_by_trip( - vp_with_shape).set_index("trip_instance_key") - - time2 = datetime.datetime.now() - print(f"get trip total counts: {time2 - time1}") - - - # Merge our total counts by trip with the vp that fall within shape - results_df = dd.merge( - vp_trip_totals, - vp_trip_in_shape_totals, - left_index = True, - right_index = True, - how = "left" - ) - - results_df.to_parquet( - f"{SEGMENT_GCS}trip_summary/vp_spatial_accuracy_{analysis_date}.parquet", - #overwrite = True, - ) - - end = datetime.datetime.now() - print(f"export: {end - time1}") - print(f"execution time: {end - start}") - - #client.close() \ No newline at end of file