Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add vp dwell time / pilot Big Blue Bus speeds post alignment #1180

Merged
merged 5 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions gtfs_funnel/vp_condenser.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,26 @@ def condense_vp_to_linestring(
We will group by trip and save out
the vp point geom into a shapely.LineString.
"""
USABLE_VP = dict_inputs.speeds_tables.usable_vp
EXPORT_FILE = dict_inputs.speeds_tables.vp_condensed_line
USABLE_VP = dict_inputs.speeds_tables.usable_vp + "_with_dwell"
EXPORT_FILE = dict_inputs.speeds_tables.vp_condensed_line + "_dwell"

vp = delayed(pd.read_parquet)(
f"{SEGMENT_GCS}{USABLE_VP}_{analysis_date}",
columns = ["trip_instance_key", "x", "y",
"vp_idx", "vp_primary_direction",
"location_timestamp_local"
"location_timestamp_local",
"moving_timestamp_local",
],
)

vp_gdf = delayed(wrangle_shapes.vp_as_gdf)(vp, crs = WGS84)

).pipe(wrangle_shapes.vp_as_gdf, crs = WGS84)

vp_condensed = delayed(vp_transform.condense_point_geom_to_line)(
vp_gdf,
vp,
group_cols = ["trip_instance_key"],
geom_col = "geometry",
other_cols = ["vp_idx", "location_timestamp_local",
"vp_primary_direction"],
"moving_timestamp_local",
"vp_primary_direction",
],
).set_geometry("geometry").set_crs(WGS84)

vp_condensed = compute(vp_condensed)[0]
Expand All @@ -69,8 +70,8 @@ def prepare_vp_for_all_directions(
Subset vp_idx, location_timestamp_local and coordinate arrays
to exclude southbound.
"""
INPUT_FILE = dict_inputs.speeds_tables.vp_condensed_line
EXPORT_FILE = dict_inputs.speeds_tables.vp_nearest_neighbor
INPUT_FILE = dict_inputs.speeds_tables.vp_condensed_line + "_dwell"
EXPORT_FILE = dict_inputs.speeds_tables.vp_nearest_neighbor + "_dwell"

vp = delayed(gpd.read_parquet)(
f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}.parquet",
Expand Down Expand Up @@ -109,11 +110,12 @@ def prepare_vp_for_all_directions(
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
level="INFO")

from shared_utils import rt_dates

for analysis_date in analysis_date_list:
for analysis_date in [rt_dates.DATES["apr2024"]]:#analysis_date_list:
start = datetime.datetime.now()

condense_vp_to_linestring(analysis_date, GTFS_DATA_DICT)
#condense_vp_to_linestring(analysis_date, GTFS_DATA_DICT)

time1 = datetime.datetime.now()

Expand Down
227 changes: 227 additions & 0 deletions gtfs_funnel/vp_dwell_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
"""
Add dwell time to vp
"""
import datetime
import pandas as pd
import sys

from dask import delayed, compute
from loguru import logger

from segment_speed_utils import helpers, segment_calcs
from segment_speed_utils.project_vars import SEGMENT_GCS

def import_vp(analysis_date: str) -> pd.DataFrame:
"""
Import vehicle positions with a subset of columns
we need to check whether bus is dwelling
at a location.
"""
vp = pd.read_parquet(
f"{SEGMENT_GCS}vp_usable_{analysis_date}",
columns = [
"trip_instance_key", "vp_idx",
"location_timestamp_local", "vp_primary_direction"
],
)

return vp


def group_vp_dwelling_rows(df: pd.DataFrame) -> pd.DataFrame:
"""
We do not know how many vp have repeated positions consecutively,
but we want to consolidate as many as we can until it moves to
the next position.

We know that the prior position has a vp index of vp_idx - 1.
If it's not that, then it's moved on.
This is important because buses can revisit a stop in a loop route,
and it can stop at a plaza, go on elsewhere, and come back to a plaza,
and we don't want to mistakenly group non-consecutive vp.
"""
df = df.assign(
#prior_expected = df.vp_idx - 1,
prior = (df.sort_values(["trip_instance_key", "vp_idx"])
.groupby("trip_instance_key", observed=True, group_keys=False)
.vp_idx
.apply(lambda x: x.shift(1))
)
)


df = df.assign(
# flag whether it is moving (we want 0's to show up for dwelling vp
# because this will get the cumcount() to work
is_moving = df.apply(
lambda x:
0 if x.prior == x.prior_expected
else 1, axis=1).astype("int8")
)

return df


def split_into_moving_and_dwelling(vp: pd.DataFrame):
"""
Use vp_primary_direction to split vp into either moving vp or dwelling vp.
Dwelling vp need extra transforms to figure how long it dwelled.
It's unknown if there was no movement, because the x, y is the
same, so direction was not able to be calculated.
The only exception is the first vp, because there is no prior point against which
to calculate direction.
"""
usable_bounds = segment_calcs.get_usable_vp_bounds_by_trip(
vp
).drop(columns = "max_vp_idx")

vp2 = pd.merge(
vp,
usable_bounds,
on = "trip_instance_key",
how = "inner"
)

vp2 = vp2.assign(
prior_expected = vp2.vp_idx - 1,
)

# keep subset of prior vp when we have unknowns,
#then we want to grab just the one above
subset_vp_prior = vp2[
vp2.vp_primary_direction=="Unknown"
].prior_expected.unique().tolist()

subset_unknown_vp = vp2[
vp2.vp_primary_direction=="Unknown"
].vp_idx.unique().tolist()

# These vp have unknowns and may need to consolidate
# leave first vp in, just in case the second vp is unknown
vp_unknowns = vp2.loc[
vp2.vp_idx.isin(subset_vp_prior + subset_unknown_vp)
]

# Vast majority of vp should be here, and we want to
# separate these out because no change is happening
# and we don't want to do an expensive row-wise shift on these
vp_knowns = vp2.loc[~vp2.vp_idx.isin(subset_vp_prior + subset_unknown_vp)]

vp_unknowns2 = group_vp_dwelling_rows(vp_unknowns)

vp3 = pd.concat(
[vp_knowns, vp_unknowns2],
axis=0, ignore_index=True
).drop(
columns = ["prior", "prior_expected"]
).fillna(
{"is_moving": 1}
).astype(
{"is_moving": "int8"}
).sort_values("vp_idx").reset_index(drop=True)

vp3 = vp3.assign(
# since is_moving=0 if the vp is dwelling,
# cumsum() will not change from the prior vp
# and a set of 2 or 3 will hold the same vp_grouping value
# once the vp moves and is_moving=1, then cumsum() will increase again
vp_grouping = (vp3.groupby("trip_instance_key",
observed=True, group_keys=False)
.is_moving
.cumsum()
)
)

return vp3


def add_dwell_time(
vp_grouped: pd.DataFrame,
) -> pd.DataFrame:
"""
Take vp that have their groups flagged and
add dwell time (in seconds). Dwell time is calculated
for this vp_location, which may not necessarily be a bus stop.
"""
group_cols = ["trip_instance_key", "vp_grouping"]

start_vp = (vp_grouped
.groupby(group_cols, observed=True, group_keys=False)
.agg({
"vp_idx": "min",
"location_timestamp_local": "min",
"vp_primary_direction": "count"
}).reset_index()
.rename(columns = {"vp_primary_direction": "n_vp_at_location"})
)

end_vp = (vp_grouped
.groupby(group_cols, observed=True, group_keys=False)
.agg({
"vp_idx": "max",
"location_timestamp_local": "max"
}).reset_index()
.rename(columns = {
"vp_idx": "end_vp_idx",
"location_timestamp_local": "moving_timestamp_local"
})
)

df = pd.merge(
start_vp,
end_vp,
on = group_cols,
how = "inner"
)

df = df.assign(
dwell_sec = (df.moving_timestamp_local -
df.location_timestamp_local).dt.total_seconds().astype("int")
)

return df

if __name__ == "__main__":

LOG_FILE = "./logs/vp_preprocessing.log"
logger.add(LOG_FILE, retention="3 months")
logger.add(sys.stderr,
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
level="INFO")

analysis_date = "2024-04-17"

start = datetime.datetime.now()

vp = delayed(import_vp)(analysis_date)

vp_grouped = delayed(split_into_moving_and_dwelling)(vp)

vp_with_dwell = delayed(add_dwell_time)(vp_grouped)

vp_with_dwell = compute(vp_with_dwell)[0]

time1 = datetime.datetime.now()
logger.info(f"compute dwell df: {time1 - start}")

vp_usable = pd.read_parquet(
f"{SEGMENT_GCS}vp_usable_{analysis_date}",
)

vp_usable_with_dwell = pd.merge(
vp_usable,
vp_with_dwell,
on = ["trip_instance_key", "vp_idx", "location_timestamp_local"],
how = "inner"
)

helpers.if_exists_then_delete(f"{SEGMENT_GCS}vp_usable_with_dwell_{analysis_date}")

vp_usable_with_dwell.to_parquet(
f"{SEGMENT_GCS}vp_usable_with_dwell_{analysis_date}",
partition_cols = "gtfs_dataset_key",
)

end = datetime.datetime.now()
logger.info(f"merge with original and export: {end - time1}")
logger.info(f"vp with dwell time: {end - start}")
763 changes: 763 additions & 0 deletions rt_segment_speeds/37_bbb_speeds_with_dwell.ipynb

Large diffs are not rendered by default.

Loading
Loading