-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
68 lines (58 loc) · 1.49 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main
import (
"fmt"
"math/rand"
"time"
"github.com/go-pg/pg/v10"
"github.com/google/uuid"
"github.com/upmkuhn/kafka-trades-demo/pkg/schema"
"github.com/upmkuhn/kafka-trades-demo/pkg/stocks"
)
var demoDb *pg.DB
func init() {
demoDb = pg.Connect(&pg.Options{
User: "user1",
Password: "password",
Database: "kafka-demo",
Addr: "127.0.0.1:5432",
})
stocks.CreateStockTradeProducer("localhost:9094", "kafka-trades-demo")
}
func main() {
fmt.Println("Registering Schema")
schema.ConnectSchemaRegistry("http://localhost:8081")
stocks.RegisterSchemas()
stocks.LoadSchemas()
fmt.Println("Booting Server")
//defer demoDb.Close()
//error := stocks.CreateSchema(demoDb)
//if error != nil {
// panic(error)
//}
fmt.Println("Created schema")
defer stocks.ConnectToKafka()
produceFakeTrades()
}
func produceFakeTrades() {
fmt.Println("Starting Fake messages")
for true {
stocks.PublishStockTrade(makeFakeTrade())
time.Sleep(time.Duration(int(time.Second) * rand.Intn(10)))
}
}
func populateDb() {
//demoDb.Model(trade).Insert()
}
func makeFakeTrade() *stocks.StockTrade {
SYMBOLS := []string{"TSLA", "APPL", "GOOG"}
TradeType := []stocks.StockTradeType{stocks.StockBought, stocks.StockSold}
trade := &stocks.StockTrade{
ID: uuid.New().String(),
SYMBOL: SYMBOLS[rand.Intn(len(SYMBOLS))],
QUANTITY: int64(rand.Intn(70000) + 100),
TYPE: TradeType[rand.Intn(2)],
PRICE: uint64(rand.Intn(99999999)),
CREATED_AT: time.Now(),
}
return trade
}