-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
117 lines (98 loc) · 3.63 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// Package main listens for interruption events from the specified InterruptionNotifier,
// incrementing a counter every time an event is received
package main
import (
"context"
"fmt"
"log"
"os"
"sync"
gcppubsub "cloud.google.com/go/pubsub"
"github.com/thought-machine/spot-interruption-exporter/internal/cache"
"github.com/thought-machine/spot-interruption-exporter/internal/compute"
"github.com/thought-machine/spot-interruption-exporter/internal/events"
"github.com/thought-machine/spot-interruption-exporter/internal/handlers"
"github.com/thought-machine/spot-interruption-exporter/internal/metrics"
"go.uber.org/zap"
)
func main() {
if err := run(); err != nil {
log.Fatal(err)
}
}
func run() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg, err := LoadConfig(os.Getenv("CONFIG_PATH"))
if err != nil {
return fmt.Errorf("failed to load app configuration: %s", err.Error())
}
logger := configureLogger(cfg)
m := metrics.NewClient(logger)
m.ServeMetrics(cfg.Prometheus.Path, cfg.Prometheus.Port)
interruptionEvents, err := createSubscriptionClient(ctx, logger, cfg.Project, cfg.PubSub.InstanceInterruptionSubscriptionName)
if err != nil {
return fmt.Errorf("failed to init instance interruption subscription: %s", err.Error())
}
creationEvents, err := createSubscriptionClient(ctx, logger, cfg.Project, cfg.PubSub.InstanceCreationSubscriptionName)
if err != nil {
return fmt.Errorf("failed to init instance creation subscription: %s", err.Error())
}
computeClient, err := createComputeClient(ctx, logger, cfg)
if err != nil {
return fmt.Errorf("failed to init compute client")
}
initialInstances, err := computeClient.ListInstancesBelongingToKubernetesCluster(ctx)
if err != nil {
return fmt.Errorf("failed to determine initial instances belonging to kubernetes clusters: %s", err.Error())
}
interruptions := make(chan *gcppubsub.Message, 30)
additions := make(chan *gcppubsub.Message, 30)
instanceToClusterMappings := cache.NewCacheWithTTLFrom(cache.NoExpiration, initialInstances)
wg := &sync.WaitGroup{}
wg.Add(2)
go interruptionEvents.Receive(ctx, interruptions)
go creationEvents.Receive(ctx, additions)
logger.Info("listening for instance creation & interruption events")
go handlers.HandleInterruptionEvents(interruptions, instanceToClusterMappings, m, logger, wg)
go handlers.HandleCreationEvents(additions, instanceToClusterMappings, logger, wg)
logger.Info("handlers started for instance creation & interruption events")
wg.Wait()
return nil
}
func createComputeClient(ctx context.Context, log *zap.SugaredLogger, cfg Config) (compute.Client, error) {
return compute.NewClient(ctx, compute.NewClientInput{
Logger: log,
ProjectID: cfg.Project,
})
}
func createSubscriptionClient(ctx context.Context, log *zap.SugaredLogger, projectID, subscriptionName string) (events.Subscription, error) {
return events.NewPubSubNotifier(ctx, &events.PubSubNotifierInput{
Logger: log,
ProjectID: projectID,
SubscriptionName: subscriptionName,
})
}
func configureLogger(cfg Config) *zap.SugaredLogger {
loggerConfig := zap.NewProductionConfig()
if err := configureLogLevel(&loggerConfig, cfg.LogLevel); err != nil {
log.Fatalf("failed to parse log level: %s", err.Error())
}
loggerConfig.EncoderConfig.TimeKey = "time"
logger, err := loggerConfig.Build()
if err != nil {
log.Fatalf("failed to initialize zap logger: %v", err)
}
return logger.Sugar()
}
func configureLogLevel(lCfg *zap.Config, logLevel string) error {
if len(logLevel) == 0 {
return nil
}
l, err := zap.ParseAtomicLevel(logLevel)
if err != nil {
return err
}
lCfg.Level = l
return nil
}