Skip to content

Commit

Permalink
VehicleBeforeStop: group filter to restore stop updates
Browse files Browse the repository at this point in the history
  • Loading branch information
paulswartz committed Mar 23, 2018
1 parent 0f81cfd commit c0a67d0
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 6 deletions.
1 change: 1 addition & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ config :concentrate,
Concentrate.GroupFilter.TimeOutOfRange,
Concentrate.GroupFilter.RemoveUnneededTimes,
Concentrate.GroupFilter.VehiclePastStop,
Concentrate.GroupFilter.VehicleBeforeStop,
Concentrate.GroupFilter.Shuttle,
Concentrate.GroupFilter.SkippedDepartures,
Concentrate.GroupFilter.CancelledTrip,
Expand Down
101 changes: 101 additions & 0 deletions lib/concentrate/group_filter/cache/vehicle_before_stop.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
defmodule Concentrate.GroupFilter.Cache.VehicleBeforeStop do
@moduledoc """
Server to maintain a cache of previously seen StopTimeUpdates for a given trip.
As the vehicle moves through, we'll remove the older updates. Periodically,
we'll scan for StopTimeUpdates in the past and remove them.
"""
use GenServer
alias Concentrate.{VehiclePosition, StopTimeUpdate}

@table __MODULE__
# 5 minutes
@stale_timeout_seconds 300

@spec stop_time_updates_for_vehicle(VehiclePosition.t(), [StopTimeUpdate.t()]) :: [
StopTimeUpdate.t()
]
def stop_time_updates_for_vehicle(vehicle_position, stop_time_updates) do
if is_integer(VehiclePosition.stop_sequence(vehicle_position)) do
insert_new_updates!(stop_time_updates)
delete_old_updates(vehicle_position)
fetch_updates_with_stop_sequence_ge_than_vehicle(vehicle_position)
else
stop_time_updates
end
rescue
ArgumentError ->
stop_time_updates
end

defp insert_new_updates!(stop_time_updates) do
inserts =
for stu <- stop_time_updates,
stop_sequence <- List.wrap(StopTimeUpdate.stop_sequence(stu)) do
trip_id = StopTimeUpdate.trip_id(stu)
time = StopTimeUpdate.time(stu)
:ets.match_delete(@table, {trip_id, stop_sequence, :_, :_})
{trip_id, stop_sequence, time, stu}
end

:ets.insert(@table, inserts)
end

defp fetch_updates_with_stop_sequence_ge_than_vehicle(vp) do
unsorted =
:ets.select(@table, [
{
{VehiclePosition.trip_id(vp), :"$1", :_, :"$2"},
[{:>=, :"$1", VehiclePosition.stop_sequence(vp)}],
[:"$2"]
}
])

Enum.sort_by(unsorted, &StopTimeUpdate.stop_sequence/1)
end

defp delete_old_updates(vp) do
:ets.select_delete(@table, [
{
{VehiclePosition.trip_id(vp), :"$1", :_, :_},
[{:<, :"$1", VehiclePosition.stop_sequence(vp)}],
[true]
}
])
end

def start_link([]) do
GenServer.start_link(__MODULE__, [])
end

@impl GenServer
def init([]) do
@table = :ets.new(@table, [:bag, :named_table, :public])
schedule_clear!()
{:ok, []}
end

@impl GenServer
def handle_info(:clear, state) do
now = System.system_time(:seconds)
minimum_time = now - @stale_timeout_seconds

:ets.select_delete(@table, [
{
{:_, :_, :"$1", :_},
[{:<, :"$1", minimum_time}],
[true]
}
])

{:noreply, state}
end

def handle_info(message, state) do
super(message, state)
end

defp schedule_clear! do
send(self(), :clear)
end
end
17 changes: 17 additions & 0 deletions lib/concentrate/group_filter/vehicle_before_stop.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Concentrate.GroupFilter.VehicleBeforeStop do
@moduledoc """
Adds a historic StopTimeUpdate for the trip if the vehicle hasn't moved past it yet.
"""
@behaviour Concentrate.GroupFilter
alias Concentrate.TripUpdate
alias Concentrate.GroupFilter.Cache.VehicleBeforeStop, as: Cache

@impl Concentrate.GroupFilter
def filter({%TripUpdate{} = tu, [vp], stus}) do
stus = Cache.stop_time_updates_for_vehicle(vp, stus)

{tu, [vp], stus}
end

def filter(other), do: other
end
9 changes: 5 additions & 4 deletions lib/concentrate/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@ defmodule Concentrate.Supervisor do
end

def children(config) do
pool = pool()
misc = misc()
alerts = alerts(config[:alerts])
gtfs = gtfs(config[:gtfs])
pipeline = pipeline(config)
Enum.concat([pool, alerts, gtfs, pipeline])
Enum.concat([misc, alerts, gtfs, pipeline])
end

def pool do
def misc do
[
:hackney_pool.child_spec(:http_producer_pool, timeout: 30_000, max_connections: 100)
:hackney_pool.child_spec(:http_producer_pool, timeout: 30_000, max_connections: 100),
Concentrate.GroupFilter.Cache.VehicleBeforeStop
]
end

Expand Down
81 changes: 81 additions & 0 deletions test/concentrate/group_filter/cache/vehicle_before_stop_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
defmodule Concentrate.GroupFilter.Cache.VehicleBeforeStopTest do
@moduledoc false
use ExUnit.Case
import Concentrate.GroupFilter.Cache.VehicleBeforeStop
alias Concentrate.{VehiclePosition, StopTimeUpdate}

defp supervised(_) do
{:ok, _} = start_supervised(Concentrate.GroupFilter.Cache.VehicleBeforeStop)
:ok
end

describe "stop_time_updates_for_vehicle/2" do
setup :supervised

test "restores older StopTimeUpdate values if the vehicle hasn't reached them" do
trip_id = "before_stop_test"

vp =
VehiclePosition.new(
id: "vehicle",
trip_id: trip_id,
stop_sequence: 1,
latitude: 1.0,
longitude: 1.0
)

stus =
for stop_sequence <- 1..4 do
StopTimeUpdate.new(trip_id: trip_id, stop_sequence: stop_sequence)
end

assert stop_time_updates_for_vehicle(vp, stus) == stus
# restores the first two StopTimeUpdates since the vehicle hasn't
# reached them
assert stop_time_updates_for_vehicle(vp, Enum.drop(stus, 2)) == stus

# restores the second StopTimeUpdate since the vehicle is past the
# first one
vp = VehiclePosition.update_stop_sequence(vp, 2)
assert stop_time_updates_for_vehicle(vp, Enum.drop(stus, 2)) == Enum.drop(stus, 1)
end

test "uses updated stop time updates for future changes" do
trip_id = "before_stop_test"

vp =
VehiclePosition.new(
id: "vehicle",
trip_id: trip_id,
stop_sequence: 1,
latitude: 1.0,
longitude: 1.0
)

stus =
for stop_sequence <- 1..4 do
StopTimeUpdate.new(trip_id: trip_id, stop_sequence: stop_sequence)
end

assert stop_time_updates_for_vehicle(vp, stus) == stus
vp = VehiclePosition.update_stop_sequence(vp, 2)

new_stus =
for stop_sequence <- 3..4 do
StopTimeUpdate.new(trip_id: trip_id, stop_sequence: stop_sequence, arrival_time: 5)
end

# we expect one old update, plus the two new ones
expected = Enum.slice(stus, 1..1) ++ new_stus
assert stop_time_updates_for_vehicle(vp, new_stus) == expected
end
end

describe "missing ETS table" do
test "stop_time_updates_for_vehicle returns same updates" do
vp = VehiclePosition.new(latitude: 1, longitude: 1)
stu = StopTimeUpdate.new([])
assert stop_time_updates_for_vehicle(vp, [stu]) == [stu]
end
end
end
47 changes: 47 additions & 0 deletions test/concentrate/group_filter/vehicle_before_stop_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule Concentrate.GroupFilter.VehicleBeforeStopTest do
@moduledoc false
use ExUnit.Case
import Concentrate.GroupFilter.VehicleBeforeStop
alias Concentrate.{TripUpdate, VehiclePosition, StopTimeUpdate}

describe "filter/1" do
setup do
{:ok, _} = start_supervised(Concentrate.GroupFilter.Cache.VehicleBeforeStop)
:ok
end

test "restores older StopTimeUpdate values if the vehicle hasn't reached them yet" do
trip_id = "before_stop_test"
tu = TripUpdate.new(trip_id: trip_id)

vp =
VehiclePosition.new(
id: "vehicle",
trip_id: trip_id,
stop_sequence: 1,
latitude: 1.0,
longitude: 1.0
)

stus =
for stop_sequence <- 1..4 do
StopTimeUpdate.new(trip_id: trip_id, stop_sequence: stop_sequence)
end

group = {tu, [vp], stus}
assert filter(group) == group
# restores the first two StopTimeUpdates since the vehicle hasn't
# reached them
assert filter({tu, [vp], Enum.drop(stus, 2)}) == group

# restores the second StopTimeUpdate since the vehicle is past the
# first one
vp = VehiclePosition.update_stop_sequence(vp, 2)
assert filter({tu, [vp], Enum.drop(stus, 2)}) == {tu, [vp], Enum.drop(stus, 1)}
end

test "ignores unknown values" do
assert filter(:value) == :value
end
end
end
4 changes: 2 additions & 2 deletions test/concentrate/supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ defmodule Concentrate.SupervisorTest do

describe "children/1" do
test "builds the right number of children" do
# currently, the right number is 4: HTTP pool, alerts, GTFS, pipeline
# currently, the right number is 5: HTTP pool, alerts, GTFS, cache, pipeline
actual = children([])

assert length(actual) == 4
assert length(actual) == 5
end
end
end

0 comments on commit c0a67d0

Please sign in to comment.