-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
190 lines (147 loc) · 4.91 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package main
import (
"flag"
"fmt"
"io"
"net/http"
"os"
"github.com/gprossliner/xhdl"
"github.com/world-direct/suss"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
)
var (
fBindAddress string
fKubeConfig string
fNodeName string
fLeaseNamespace string
fConsiderSoleReplicasCritical bool
fConsiderStatefulSetCritical bool
service suss.Service
)
func main() {
flag.StringVar(&fBindAddress, "bindAddress", "localhost:9993", "address to bind http socket")
flag.StringVar(&fKubeConfig, "kubeconfig", "", "kubeconfig to use, if not set InClusterConfig is used, can be set by KUBECONFIG envvar")
flag.StringVar(&fNodeName, "nodename", "", "the name of the node running the service. Can be set by NODENAME envvar")
flag.StringVar(&fLeaseNamespace, "leasenamespace", "", "the namespace for the lease, can be set by the NAMESPACE envvar")
flag.BoolVar(&fConsiderStatefulSetCritical, "considerStatefulSetCritical", false, "all pods part of a statefulset are critical")
flag.BoolVar(&fConsiderSoleReplicasCritical, "considerSoleReplicasCritical", false, "all pods part of a replicaset with only one replica are critical")
// klog.InitFlags(flag.CommandLine)
flag.Parse()
err := xhdl.Run(initService)
if err != nil {
klog.Error(err)
os.Exit(1)
}
http.HandleFunc("/version", cmdVersion)
http.HandleFunc("/healthz", cmdHealthz)
http.HandleFunc("/logstream", cmdLogStream)
http.HandleFunc("/criticalpods", cmdCriticalPods)
registerCommand("synchronize", func(ctx xhdl.Context) { service.Synchronize(ctx) })
registerCommand("teardown", func(ctx xhdl.Context) { service.Teardown(ctx) })
registerCommand("release", func(ctx xhdl.Context) { service.Release(ctx) })
registerCommand("releasedelayed", func(ctx xhdl.Context) { service.ReleaseDelayed(ctx) })
registerCommand("testfail", func(ctx xhdl.Context) { service.TestFail(ctx) })
klog.Infof("listen on %s\n", fBindAddress)
err = http.ListenAndServe(fBindAddress, nil)
if err != nil {
klog.Error(err)
os.Exit(1)
}
}
func getCommandHandler(name string, fn func(ctx xhdl.Context)) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
klog.Infof("/%s\n", name)
// setup own logger to context so that log messages can be
// written to the client
defaultlog := klog.FromContext(r.Context())
sink := defaultlog.GetSink()
ctxlog := defaultlog.WithSink(rwsink{sink})
myctx := klog.NewContext(r.Context(), ctxlog)
err := xhdl.RunContext(myctx, func(ctx xhdl.Context) {
fn(ctx)
})
if err != nil {
klog.Error(err.Error())
w.WriteHeader(500)
io.WriteString(w, err.Error())
}
}
}
func registerCommand(name string, fn func(ctx xhdl.Context)) {
http.HandleFunc("/"+name, getCommandHandler(name, fn))
}
func initService(ctx xhdl.Context) {
// validate nodename
if fNodeName == "" {
fNodeName = os.Getenv("NODENAME")
}
if fNodeName == "" {
ctx.Throw(fmt.Errorf("--nodename arg or NODENAME env var required"))
}
// kubeconfig and create Config struct
k8sConfig := getK8sConfig(ctx)
k8s := kubernetes.NewForConfigOrDie(k8sConfig)
// namespace handling
if fLeaseNamespace == "" {
fLeaseNamespace = os.Getenv("NAMESPACE")
if fLeaseNamespace == "" {
ctx.Throw(fmt.Errorf("--leasenamespace arg or NAMESPACE env var required"))
}
}
klog.Infof("using namespace %s for the Lease", fLeaseNamespace)
// init options
opt := suss.SussOptions{
NodeName: fNodeName,
LeaseNamespace: fLeaseNamespace,
K8s: k8s,
ConsiderStatefulSetCritical: fConsiderStatefulSetCritical,
ConsiderSoleReplicasCritical: fConsiderSoleReplicasCritical,
}
// and create service
service = suss.NewService(opt)
// perform start tasks, like delayed release
service.Start(ctx)
}
func getK8sConfig(ctx xhdl.Context) *rest.Config {
if fKubeConfig == "" {
fKubeConfig = os.Getenv("KUBECONFIG")
}
if fKubeConfig == "" {
klog.Infof("using InClusterConfig\n")
k8sConfig, err := rest.InClusterConfig()
ctx.Throw(err)
return k8sConfig
} else {
klog.Infof("using configfile %s\n", fKubeConfig)
k8sConfig, err := clientcmd.BuildConfigFromFlags("", fKubeConfig)
ctx.Throw(err)
return k8sConfig
}
}
func cmdVersion(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, VERSION+"\n")
}
func cmdHealthz(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, "OK\n")
}
func cmdLogStream(w http.ResponseWriter, r *http.Request) {
registerLogWriter(w)
// this sleeps until client cancels
<-r.Context().Done()
unregisterLogWriter(w)
}
func cmdCriticalPods(w http.ResponseWriter, r *http.Request) {
err := xhdl.RunContext(r.Context(), func(ctx xhdl.Context) {
criticalPods := service.GetCriticalPods(ctx)
for _, p := range criticalPods {
io.WriteString(w, p+"\n")
}
})
if err != nil {
io.WriteString(w, err.Error())
w.WriteHeader(500)
}
}