Skip to content

Commit

Permalink
Use a table-scan pull query for nearest carpark lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Oct 6, 2021
1 parent 566a3a1 commit 3c4aa06
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 48 deletions.
77 changes: 29 additions & 48 deletions telegram-bot-carparks/go/ksqldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func checkSpaces(c string) (latestTS string, emptyPlaces float64, pctFull float6
client := ksqldb.NewClient(KSQLDB_ENDPOINT, KSQLDB_API_KEY, KSQLDB_API_SECRET).Debug()

k := "SELECT LATEST_TS, CURRENT_EMPTY_PLACES, CURRENT_PCT_FULL FROM CARPARK WHERE NAME='" + c + "';"
_, r, e := client.Pull(ctx, k)
_, r, e := client.Pull(ctx, k, false)

if e != nil {
// handle the error better here, e.g. check for no rows returned
Expand Down Expand Up @@ -112,64 +112,45 @@ func alertSpaces(a chan<- string, c int) (e error) {
// than 10 spaces available.
func getClosest(lat float64, lon float64) (c carPark, e error) {
const availableThreshold = 10
const queryResultTimeoutSeconds = 20

var cps carParks

ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()

client := ksqldb.NewClient(KSQLDB_ENDPOINT, KSQLDB_API_KEY, KSQLDB_API_SECRET).Debug()

// Prepare the request
k := "SELECT NAME AS CARPARK, TIMESTAMPTOSTRING(TS,'yyyy-MM-dd HH:mm:ss','Europe/London'), GEO_DISTANCE(CAST(" + fmt.Sprintf("%v", lat) + " AS DOUBLE), "
k := "SELECT NAME AS CARPARK, LATEST_TS, GEO_DISTANCE(CAST(" + fmt.Sprintf("%v", lat) + " AS DOUBLE), "
k += " CAST(" + fmt.Sprintf("%v", lon) + " AS DOUBLE), LATITUDE, LONGITUDE) AS DISTANCE_TO_CARPARK_KM, "
k += " EMPTY_PLACES, ((CAST(CAPACITY as DOUBLE) - CAST(EMPTY_PLACES AS DOUBLE)) / CAST(CAPACITY AS DOUBLE)) * 100 AS PCT_FULL, "
k += " CURRENT_EMPTY_PLACES, ((CAST(CAPACITY as DOUBLE) - CAST(CURRENT_EMPTY_PLACES AS DOUBLE)) / CAST(CAPACITY AS DOUBLE)) * 100 AS PCT_FULL, "
k += " CAPACITY, DIRECTIONSURL, LATITUDE,LONGITUDE"
k += " FROM CARPARK_EVENTS C "
k += " WHERE EMPTY_PLACES > " + fmt.Sprintf("%v", availableThreshold)
k += " AND ROWTIME > UNIX_TIMESTAMP()-(1000 * 60 * 5)"
k += " EMIT CHANGES;"
// k := "SELECT NAME AS CARPARK, LATEST_TS, GEO_DISTANCE(CAST(" + fmt.Sprintf("%v", lat) + " AS DOUBLE), "
// k += " CAST(" + fmt.Sprintf("%v", lon) + " AS DOUBLE), LATITUDE, LONGITUDE) AS DISTANCE_TO_CARPARK_KM, "
// k += " CURRENT_EMPTY_PLACES, CURRENT_PCT_FULL, "
// k += " CAPACITY, DIRECTIONSURL, LATITUDE,LONGITUDE"
// k += " FROM CARPARK C "
// k += " WHERE CURRENT_EMPTY_PLACES > " + fmt.Sprintf("%v", availableThreshold)
// k += " AND ROWTIME > UNIX_TIMESTAMP()-(1000 * 60 * 5)"
// k += " EMIT CHANGES;"
k += " FROM CARPARK C "
k += " WHERE CURRENT_EMPTY_PLACES > " + fmt.Sprintf("%v", availableThreshold)
k += ";"

// This Go routine will handle rows as and when they
// are sent to the channel
rc := make(chan ksqldb.Row)
hc := make(chan ksqldb.Header, 1)
_, r, e := client.Pull(ctx, k, true)

go func() {
if e != nil {
// handle the error better here, e.g. check for no rows returned
return c, fmt.Errorf("Error running Pull request against ksqlDB:\n%v", e)
}

for row := range rc {
if row != nil {
for _, row := range r {
if row != nil {

// Store the row values in the carPark object
c.name = row[0].(string)
c.ts = row[1].(string)
c.distanceKm = row[2].(float64)
c.emptyplaces = int64(row[3].(float64))
c.capacity = int64(row[5].(float64))
c.URL = row[6].(string)
c.lat, _ = row[7].(float64)
c.lon, _ = row[8].(float64)
// Add the carPark to the slice
cps = append(cps, c)
}
// Store the row values in the carPark object
c.name = row[0].(string)
c.ts = row[1].(string)
c.distanceKm = row[2].(float64)
c.emptyplaces = int64(row[3].(float64))
c.capacity = int64(row[5].(float64))
c.URL = row[6].(string)
c.lat, _ = row[7].(float64)
c.lon, _ = row[8].(float64)
// Add the carPark to the slice
cps = append(cps, c)
}
}()

// Do the request
ctx, cancel := context.WithTimeout(context.TODO(), queryResultTimeoutSeconds*time.Second)
defer cancel()

client := ksqldb.NewClient(KSQLDB_ENDPOINT, KSQLDB_API_KEY, KSQLDB_API_SECRET).Debug()

e = client.Push(ctx, k, "latest", rc, hc)

if e != nil {
// handle the error better here, e.g. check for no rows returned
return c, fmt.Errorf("Error running Push request against ksqlDB:\n%v", e)
}

// do clever return stuff here taking the list of carparks and sorting it by distance
Expand Down

0 comments on commit 3c4aa06

Please sign in to comment.