Skip to content

Commit

Permalink
Additional RT trip validations
Browse files Browse the repository at this point in the history
  • Loading branch information
irees committed Nov 8, 2023
1 parent cc51ca2 commit 9645928
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 12 deletions.
128 changes: 125 additions & 3 deletions rt/validator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rt

import (
"fmt"
"time"

"github.com/interline-io/transitland-lib/internal/xy"
Expand All @@ -12,6 +13,10 @@ import (
type tripInfo struct {
UsesFrequency bool
DirectionID int
ServiceID string
ShapeID string
StartTime tt.WideTime
EndTime tt.WideTime
}

type stopInfo struct {
Expand All @@ -28,6 +33,7 @@ type Validator struct {
tripInfo map[string]tripInfo
routeInfo map[string]routeInfo
stopInfo map[string]stopInfo
services map[string]*tl.Service
geomCache *xy.GeomCache // shared with copier
}

Expand All @@ -37,6 +43,7 @@ func NewValidator() *Validator {
tripInfo: map[string]tripInfo{},
routeInfo: map[string]routeInfo{},
stopInfo: map[string]stopInfo{},
services: map[string]*tl.Service{},
geomCache: xy.NewGeomCache(),
}
}
Expand Down Expand Up @@ -73,8 +80,19 @@ func (fi *Validator) Validate(ent tl.Entity) []error {
fi.stopInfo[v.StopID] = stopInfo{LocationType: v.LocationType}
case *tl.Route:
fi.routeInfo[v.RouteID] = routeInfo{RouteType: v.RouteType}
case *tl.Service:
fi.services[v.ServiceID] = v
case *tl.Trip:
fi.tripInfo[v.TripID] = tripInfo{DirectionID: v.DirectionID}
ti := tripInfo{
DirectionID: v.DirectionID,
ServiceID: v.ServiceID,
ShapeID: v.ShapeID.String(),
}
if len(v.StopTimes) > 0 {
ti.StartTime = v.StopTimes[0].DepartureTime
ti.EndTime = v.StopTimes[len(v.StopTimes)-1].ArrivalTime
}
fi.tripInfo[v.TripID] = ti
case *tl.Frequency:
a := fi.tripInfo[v.TripID]
a.UsesFrequency = true
Expand Down Expand Up @@ -143,7 +161,7 @@ func (fi *Validator) ValidateFeedEntity(ent *pb.FeedEntity, current *pb.FeedMess
errs = append(errs, fi.ValidateTripUpdate(ent.GetTripUpdate(), current)...)
}
if ent.Vehicle != nil {
// TODO: ValidateVehiclePosition
errs = append(errs, fi.ValidateVehiclePosition(ent.GetVehicle())...)
}
if ent.Alert != nil {
// TODO: ValidateAlert
Expand Down Expand Up @@ -250,7 +268,7 @@ func (fi *Validator) ValidateStopTimeUpdate(st *pb.TripUpdate_StopTimeUpdate, cu
case pb.TripUpdate_StopTimeUpdate_SKIPPED:
// ok
}
if st.GetArrival().GetTime() > st.GetDeparture().GetTime() {
if st.GetArrival().GetTime() > 0 && st.GetDeparture().GetTime() > 0 && st.GetArrival().GetTime() > st.GetDeparture().GetTime() {
errs = append(errs, ne("StopTimeUpdate arrival time is later than departure time", 25))
}
// ValidateStopTimeEvent .
Expand Down Expand Up @@ -302,6 +320,110 @@ func (fi *Validator) ValidateTripDescriptor(td *pb.TripDescriptor, current *pb.F
return errs
}

func (fi *Validator) ActiveTrips(now time.Time) []string {
var ret []string
nowWt := tt.NewWideTimeFromSeconds(now.Hour()*3600 + now.Minute()*60 + now.Second())
nowSvc := map[string]bool{}
tripHasUpdate := map[string]bool{}
msgTripIds := map[string]bool{}
for k, v := range fi.tripInfo {
svc, ok := fi.services[v.ServiceID]
if !ok {
// log.Debug().
// Str("service", v.ServiceID).
// Str("trip", k).
// Msg("no service, skipping")
continue
}
sched, ok := nowSvc[svc.ServiceID]
if !ok {
sched = svc.IsActive(now)
nowSvc[svc.ServiceID] = sched
}
if !sched {
// log.Debug().
// Str("date", now.Format("2006-02-03")).
// Str("service", v.ServiceID).
// Str("trip", k).
// Msg("not scheduled, skipping")
continue
}
if v.StartTime.Seconds > nowWt.Seconds || v.EndTime.Seconds < nowWt.Seconds {
// log.Debug().
// Str("date", now.Format("2006-02-03")).
// Str("cur_time", nowWt.String()).
// Str("trip_start", v.StartTime.String()).
// Str("trip_end", v.EndTime.String()).
// Str("service", v.ServiceID).
// Str("trip", k).
// Msg("outside time, skipping")
continue
}
ret = append(ret, k)
tripHasUpdate[k] = false
if msgTripIds[k] {
tripHasUpdate[k] = true
}
}
return ret
}

type TripUpdateStats struct {
TripScheduledCount int `json:"trip_scheduled_count,omitempty"`
TripUpdateMatchCount int `json:"trip_update_match_count,omitempty"`
Date time.Time `json:"date,omitempty"`
}

func (fi *Validator) TripUpdateStats(now time.Time, msg *pb.FeedMessage) (TripUpdateStats, error) {
schedTrips := fi.ActiveTrips(now)
tripHasUpdate := map[string]bool{}
msgTripIds := map[string]bool{}
for _, ent := range msg.Entity {
if tu := ent.TripUpdate; tu != nil {
msgTripIds[tu.GetTrip().GetTripId()] = true
}
}
for _, k := range schedTrips {
tripHasUpdate[k] = false
if msgTripIds[k] {
tripHasUpdate[k] = true
}
}
tuCount := 0
for _, v := range tripHasUpdate {
if v {
tuCount += 1
}
}
return TripUpdateStats{
TripScheduledCount: len(tripHasUpdate),
TripUpdateMatchCount: tuCount,
Date: now,
}, nil
}

func (fi *Validator) ValidateVehiclePosition(ent *pb.VehiclePosition) (errs []error) {
pos := ent.GetPosition()
posPt := xy.Point{Lon: float64(pos.GetLongitude()), Lat: float64(pos.GetLatitude())}
tripHasPosition := map[string]bool{}
if td := ent.Trip; td != nil && pos != nil {
tripId := td.GetTripId()
trip, ok := fi.tripInfo[tripId]
shp := fi.geomCache.GetShape(trip.ShapeID)
tripHasPosition[tripId] = true
if ok && trip.ShapeID != "" && len(shp) > 0 {
fmt.Println("Vehicle position:", posPt)
nearestPoint, _ := xy.LineClosestPoint(shp, posPt)
fmt.Println("\ttrip:", tripId, "shape:", trip.ShapeID)
fmt.Println("\tnearestPoint:", nearestPoint, "dist:", xy.DistanceHaversine(nearestPoint.Lon, nearestPoint.Lat, posPt.Lon, posPt.Lat))
}
}
for _, schedTrip := range fi.ActiveTrips(time.Now()) {
_ = schedTrip
}
return nil
}

func ne(msg string, code int) *RealtimeError {
return &RealtimeError{
Code: code,
Expand Down
26 changes: 24 additions & 2 deletions rt/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package rt
import (
"testing"

"github.com/interline-io/transitland-lib/adapters/empty"
"github.com/interline-io/transitland-lib/copier"
"github.com/interline-io/transitland-lib/internal/testutil"
"github.com/interline-io/transitland-lib/tlcsv"
)
Expand Down Expand Up @@ -58,10 +60,30 @@ func TestValidateTripUpdate(t *testing.T) {
}
}

func TestValidateVehiclePosition(t *testing.T) {
func TestValidateAlert(t *testing.T) {

}

func TestValidateAlert(t *testing.T) {
func TestValidateTripUpdatePercentage(t *testing.T) {

}

func TestValidateVehiclePositions(t *testing.T) {
r, err := tlcsv.NewReader(testutil.RelPath("test/data/rt/ct.zip"))
if err != nil {
t.Fatal(err)
}
msg, err := ReadFile(testutil.RelPath("test/data/rt/ct-vehicle-positions.pb"))
if err != nil {
t.Error(err)
}
cp, err := copier.NewCopier(r, &empty.Writer{}, copier.Options{})
if err != nil {
t.Fatal(err)
}
ex := NewValidator()
cp.AddExtension(ex)
result := cp.Copy()
_ = result
ex.ValidateFeedMessage(msg, nil)
}
Binary file added test/data/rt/ct-service-alerts.pb
Binary file not shown.
Binary file added test/data/rt/ct-trip-updates.pb
Binary file not shown.
Binary file added test/data/rt/ct-vehicle-positions.pb
Binary file not shown.
Binary file added test/data/rt/ct.zip
Binary file not shown.
8 changes: 5 additions & 3 deletions validator/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package validator
import (
"github.com/interline-io/transitland-lib/copier"
"github.com/interline-io/transitland-lib/dmfr"
"github.com/interline-io/transitland-lib/rt"
"github.com/interline-io/transitland-lib/tl"
)

Expand All @@ -24,7 +25,8 @@ type Result struct {
}

type RealtimeResult struct {
Url string `json:"url"`
Json map[string]any `json:"json"`
Errors []error
Url string `json:"url"`
Json map[string]any `json:"json"`
TripUpdateStats rt.TripUpdateStats `json:"trip_update_stats"`
Errors []error
}
16 changes: 12 additions & 4 deletions validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"path/filepath"
"time"

"github.com/interline-io/transitland-lib/adapters/empty"
"github.com/interline-io/transitland-lib/copier"
Expand Down Expand Up @@ -160,21 +161,28 @@ func (v *Validator) Validate() (*Result, error) {

// Validate realtime messages
for _, fn := range v.Options.ValidateRealtimeMessages {
rtResult := RealtimeResult{
Url: fn,
}
var rterrs []error
msg, err := rt.ReadURL(fn, request.WithMaxSize(v.Options.MaxRTMessageSize))
if err != nil {
rterrs = append(rterrs, err)
} else {
rterrs = v.rtValidator.ValidateFeedMessage(msg, nil)
tz, _ := time.LoadLocation("America/Los_Angeles")
tripUpdateStats, err := v.rtValidator.TripUpdateStats(time.Now().In(tz), msg)
if err != nil {
rterrs = append(rterrs, err)
} else {
rtResult.TripUpdateStats = tripUpdateStats
}
}
result.HandleError(filepath.Base(fn), rterrs)
if len(rterrs) > v.Options.ErrorLimit {
rterrs = rterrs[0:v.Options.ErrorLimit]
}
rtResult := RealtimeResult{
Url: fn,
Errors: rterrs,
}
rtResult.Errors = rterrs
if v.Options.IncludeRealtimeJson && msg != nil {
rtJson, err := protojson.Marshal(msg)
if err != nil {
Expand Down

0 comments on commit 9645928

Please sign in to comment.