Skip to content

Commit

Permalink
WIP for NDC Sydney 2020
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Oct 13, 2020
1 parent bf2372b commit 2c588dc
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 2 deletions.
1 change: 1 addition & 0 deletions telegram-bot-carparks/.gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
kafka-connect-rest
elastic-sink.json
data/connectors
go/telegram_config.go
go/ksqldb_config.go
16 changes: 16 additions & 0 deletions telegram-bot-carparks/elastic-sink.json.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"topics": "CARPARK_EVENTS_v00",
"input.data.format": "PROTOBUF",
"connector.class": "ElasticsearchSink",
"name": "ElasticsearchSinkConnector_1",
"kafka.api.key": "",
"kafka.api.secret": "",
"connection.url": "https://xxxxxxxx.eu-west-1.aws.found.io:9243",
"connection.username": "elastic",
"connection.password": "",
"type.name": "_doc",
"key.ignore": "true",
"schema.ignore": "true",
"behavior.on.malformed.documents": "fail",
"tasks.max": "1"
}
14 changes: 13 additions & 1 deletion telegram-bot-carparks/go/ksqldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
ksqldb "github.com/rmoff/ksqldb-go"
)

// Takes the name of a carpark and returns the number
// of empty places, the percentage occupied, and
// the timestamp of the data reported
func checkSpaces(c string) (latestTS string, emptyPlaces float64, pctFull float64, err error) {

ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
Expand Down Expand Up @@ -37,7 +40,13 @@ func checkSpaces(c string) (latestTS string, emptyPlaces float64, pctFull float6

}

// Returns a channel, to which a carpark's details are
// written each time data is received in which more
// than the specified number of places are available
func alertSpaces(a chan<- string, c int) (e error) {
// Whilst this is implemented using ksqlDB, perhaps it could be done
// using a normal Kafka consumer and use Golang to apply the filter
//
// TODO01 add a channel so that user can run another
// command to delete an alert
//
Expand All @@ -47,7 +56,7 @@ func alertSpaces(a chan<- string, c int) (e error) {

// Run this for five minutes and then exit
// TODO: does this actually do this?
const queryResultTimeoutSeconds = 600
const queryResultTimeoutSeconds = 300

// Prepare the request
k := "SELECT NAME, TS, CAPACITY, EMPTY_PLACES"
Expand Down Expand Up @@ -98,6 +107,9 @@ func alertSpaces(a chan<- string, c int) (e error) {
return nil
}

// Returns the details of the carpark that is nearest
// to the provided location (lat/long), with more
// than 10 spaces available.
func getClosest(lat float64, lon float64) (c carPark, e error) {
const availableThreshold = 10
const queryResultTimeoutSeconds = 20
Expand Down
2 changes: 1 addition & 1 deletion telegram-bot-carparks/go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func main() {

chatID = update.Message.Chat.ID
t := update.Message.Text
log.Printf("[%s] %s (command: %v)", update.Message.From.UserName, t, update.Message.IsCommand())
log.Printf("Received message from %s: %s (command: %v)", update.Message.From.UserName, t, update.Message.IsCommand())
switch {
case update.Message.IsCommand():
// Handle commands
Expand Down

0 comments on commit 2c588dc

Please sign in to comment.